ws_client.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package agent_server
  2. import (
  3. "errors"
  4. "github.com/gorilla/websocket"
  5. "github.com/hashicorp/go-uuid"
  6. "log"
  7. "nginx-ui/server/models"
  8. "sync"
  9. "time"
  10. )
  11. const (
  12. // Time allowed to write a message to the peer.
  13. writeWait = 12 * time.Second
  14. readWait = 12 * time.Second
  15. // Time allowed to read the next pong message from the peer. ping period must be less
  16. pongWait = 10 * time.Second
  17. // Send pings to peer with this period. Must be less than pongWait.
  18. pingPeriod = (pongWait * 9) / 10
  19. // Maximum message size allowed from peer.
  20. maxMessageSize = 4096
  21. )
  22. // WsClient is a middleman between the websocket connection and the hub.
  23. type WsClient struct {
  24. Token string `json:"id"`
  25. // The websocket connection.
  26. conn *websocket.Conn
  27. // Buffered channel of outbound messages.
  28. send chan []byte
  29. callbacks map[string]chan *models.AgentData
  30. lock *sync.Mutex
  31. }
  32. func NewWsClient(token string, conn *websocket.Conn) *WsClient {
  33. return &WsClient{
  34. Token: token,
  35. conn: conn,
  36. send: make(chan []byte, 4096),
  37. callbacks: make(map[string]chan *models.AgentData),
  38. }
  39. }
  40. // readPump pumps messages from the websocket connection to the hub.
  41. //
  42. // The application runs readPump in a per-connection goroutine. The application
  43. // ensures that there is at most one reader on a connection by executing all
  44. // reads from this goroutine.
  45. func (c *WsClient) readPump() {
  46. defer func() {
  47. AgentHub.unregister <- c
  48. log.Printf("readPump closed")
  49. }()
  50. c.conn.SetReadLimit(maxMessageSize)
  51. _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
  52. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  53. c.conn.SetPingHandler(func(string) error {
  54. log.Printf("ping: %v", time.Now())
  55. _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
  56. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  57. _ = c.conn.WriteMessage(websocket.PongMessage, []byte{})
  58. return nil
  59. })
  60. for {
  61. agentData := &models.AgentData{}
  62. err := c.conn.ReadJSON(&agentData)
  63. log.Printf("recv: %v", err)
  64. if err != nil {
  65. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  66. log.Printf("error: %v", err)
  67. }
  68. log.Printf("read message: %v", err)
  69. break
  70. }
  71. r, ok := c.callbacks[agentData.RequestId]
  72. if ok {
  73. r <- agentData
  74. continue
  75. }
  76. handler, ok := AgentHub.handlers[agentData.Type]
  77. if ok {
  78. go func() {
  79. result, err := handler(c, agentData)
  80. log.Printf("result: %v,err: %v", result, err)
  81. if err != nil {
  82. agentData.Success = false
  83. agentData.Data = make(map[string]interface{})
  84. agentData.Msg = err.Error()
  85. } else {
  86. agentData.Success = true
  87. agentData.Data = result
  88. agentData.Msg = ""
  89. }
  90. c.JustSend(agentData)
  91. }()
  92. }
  93. }
  94. }
  95. /*
  96. Send 发送消息
  97. 参数:
  98. message - 消息体
  99. result - 返回结构体指针
  100. timeout - 超时时间
  101. 返回值:
  102. error - 错误
  103. */
  104. func (c *WsClient) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) {
  105. var rec = make(chan *models.AgentData)
  106. if data.RequestId == "" {
  107. id, _ := uuid.GenerateUUID()
  108. data.RequestId = id
  109. }
  110. c.callbacks[data.RequestId] = rec
  111. timer := time.NewTimer(timeout)
  112. defer func() {
  113. delete(c.callbacks, data.RequestId)
  114. timer.Stop()
  115. }()
  116. err := c.conn.WriteJSON(data)
  117. if err != nil {
  118. return nil, err
  119. }
  120. for {
  121. select {
  122. case res, ok := <-rec:
  123. if !ok {
  124. return nil, errors.New("receive data fail")
  125. }
  126. return res, nil
  127. case <-timer.C:
  128. return nil, errors.New("timeout")
  129. }
  130. }
  131. }
  132. // JustSend 仅发送,不管是否成功,用于消息回复
  133. func (c *WsClient) JustSend(message interface{}) {
  134. err := c.conn.WriteJSON(message)
  135. log.Printf("JustSend: message: %s, %v", message, err)
  136. }
  137. func (c *WsClient) Close() {
  138. _ = c.conn.Close()
  139. close(c.send)
  140. }