123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package agent_server
- import (
- "errors"
- "github.com/gorilla/websocket"
- "github.com/hashicorp/go-uuid"
- "log"
- "nginx-ui/server/models"
- "time"
- )
- const (
- // Time allowed to write a message to the peer.
- writeWait = 12 * time.Second
- readWait = 12 * time.Second
- // Time allowed to read the next pong message from the peer. ping period must be less
- pongWait = 10 * time.Second
- // Send pings to peer with this period. Must be less than pongWait.
- pingPeriod = (pongWait * 9) / 10
- // Maximum message size allowed from peer.
- maxMessageSize = 4096
- )
- // WsClient is a middleman between the websocket connection and the hub.
- type WsClient struct {
- Token string `json:"id"`
- hub *Hub
- // The websocket connection.
- conn *websocket.Conn
- // Buffered channel of outbound messages.
- send chan []byte
- callbacks map[string]chan *models.AgentData
- }
- // readPump pumps messages from the websocket connection to the hub.
- //
- // The application runs readPump in a per-connection goroutine. The application
- // ensures that there is at most one reader on a connection by executing all
- // reads from this goroutine.
- func (c *WsClient) readPump() {
- defer func() {
- c.hub.unregister <- c
- _ = c.conn.Close()
- log.Printf("readPump closed")
- }()
- c.conn.SetReadLimit(maxMessageSize)
- _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
- _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
- c.conn.SetPingHandler(func(string) error {
- log.Printf("ping: %v", time.Now())
- _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
- _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
- _ = c.conn.WriteMessage(websocket.PongMessage, []byte{})
- return nil
- })
- for {
- agentData := &models.AgentData{}
- err := c.conn.ReadJSON(&agentData)
- log.Printf("recv: %v", err)
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- log.Printf("error: %v", err)
- }
- log.Printf("read message: %v", err)
- break
- }
- r, ok := c.callbacks[agentData.RequestId]
- if ok {
- r <- agentData
- continue
- }
- handler, ok := c.hub.handlers[agentData.Type]
- if ok {
- go func() {
- result, err := handler(c, agentData)
- log.Printf("result: %v,err: %v", result, err)
- if err != nil {
- agentData.Success = false
- agentData.Data = make(map[string]interface{})
- agentData.Msg = err.Error()
- } else {
- agentData.Success = true
- agentData.Data = result
- agentData.Msg = ""
- }
- c.JustSend(agentData)
- }()
- }
- }
- }
- /*
- Send 发送消息
- 参数:
- message - 消息体
- result - 返回结构体指针
- timeout - 超时时间
- 返回值:
- error - 错误
- */
- func (c *WsClient) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) {
- var rec = make(chan *models.AgentData)
- if data.RequestId == "" {
- id, _ := uuid.GenerateUUID()
- data.RequestId = id
- }
- c.callbacks[data.RequestId] = rec
- timer := time.NewTimer(timeout)
- defer func() {
- delete(c.callbacks, data.RequestId)
- timer.Stop()
- }()
- err := c.conn.WriteJSON(data)
- if err != nil {
- return nil, err
- }
- for {
- select {
- case res, ok := <-rec:
- if !ok {
- return nil, errors.New("receive data fail")
- }
- return res, nil
- case <-timer.C:
- return nil, errors.New("timeout")
- }
- }
- }
- // JustSend 仅发送,不管是否成功,用于消息回复
- func (c *WsClient) JustSend(message interface{}) {
- err := c.conn.WriteJSON(message)
- log.Printf("JustSend: message: %s, %v", message, err)
- }
|