package agent_server import ( "errors" "github.com/gorilla/websocket" "github.com/hashicorp/go-uuid" "log" "nginx-ui/server/models" "sync" "time" ) const ( // Time allowed to write a message to the peer. writeWait = 12 * time.Second readWait = 12 * time.Second // Time allowed to read the next pong message from the peer. ping period must be less pongWait = 10 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 4096 ) // WsClient is a middleman between the websocket connection and the hub. type WsClient struct { Token string `json:"id"` // The websocket connection. conn *websocket.Conn // Buffered channel of outbound messages. send chan []byte callbacks map[string]chan *models.AgentData lock *sync.Mutex } func NewWsClient(token string, conn *websocket.Conn) *WsClient { return &WsClient{ Token: token, conn: conn, send: make(chan []byte, 4096), callbacks: make(map[string]chan *models.AgentData), } } // readPump pumps messages from the websocket connection to the hub. // // The application runs readPump in a per-connection goroutine. The application // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *WsClient) readPump() { defer func() { AgentHub.unregister <- c log.Printf("readPump closed") }() c.conn.SetReadLimit(maxMessageSize) _ = c.conn.SetReadDeadline(time.Now().Add(readWait)) _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) c.conn.SetPingHandler(func(string) error { log.Printf("ping: %v", time.Now()) _ = c.conn.SetReadDeadline(time.Now().Add(readWait)) _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) _ = c.conn.WriteMessage(websocket.PongMessage, []byte{}) return nil }) for { agentData := &models.AgentData{} err := c.conn.ReadJSON(&agentData) log.Printf("recv: %v", err) if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("error: %v", err) } log.Printf("read message: %v", err) break } r, ok := c.callbacks[agentData.RequestId] if ok { r <- agentData continue } handler, ok := AgentHub.handlers[agentData.Type] if ok { go func() { result, err := handler(c, agentData) log.Printf("result: %v,err: %v", result, err) if err != nil { agentData.Success = false agentData.Data = make(map[string]interface{}) agentData.Msg = err.Error() } else { agentData.Success = true agentData.Data = result agentData.Msg = "" } c.JustSend(agentData) }() } } } /* Send 发送消息 参数: message - 消息体 result - 返回结构体指针 timeout - 超时时间 返回值: error - 错误 */ func (c *WsClient) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) { var rec = make(chan *models.AgentData) if data.RequestId == "" { id, _ := uuid.GenerateUUID() data.RequestId = id } c.callbacks[data.RequestId] = rec timer := time.NewTimer(timeout) defer func() { delete(c.callbacks, data.RequestId) timer.Stop() }() err := c.conn.WriteJSON(data) if err != nil { return nil, err } for { select { case res, ok := <-rec: if !ok { return nil, errors.New("receive data fail") } return res, nil case <-timer.C: return nil, errors.New("timeout") } } } // JustSend 仅发送,不管是否成功,用于消息回复 func (c *WsClient) JustSend(message interface{}) { err := c.conn.WriteJSON(message) log.Printf("JustSend: message: %s, %v", message, err) } func (c *WsClient) Close() { _ = c.conn.Close() close(c.send) }