ws_client.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package agent_server
  2. import (
  3. "errors"
  4. "github.com/gorilla/websocket"
  5. "github.com/hashicorp/go-uuid"
  6. "log"
  7. "nginx-ui/server/models"
  8. "time"
  9. )
  10. const (
  11. // Time allowed to write a message to the peer.
  12. writeWait = 12 * time.Second
  13. readWait = 12 * time.Second
  14. // Time allowed to read the next pong message from the peer. ping period must be less
  15. pongWait = 10 * time.Second
  16. // Send pings to peer with this period. Must be less than pongWait.
  17. pingPeriod = (pongWait * 9) / 10
  18. // Maximum message size allowed from peer.
  19. maxMessageSize = 4096
  20. )
  21. // WsClient is a middleman between the websocket connection and the hub.
  22. type WsClient struct {
  23. Token string `json:"id"`
  24. hub *Hub
  25. // The websocket connection.
  26. conn *websocket.Conn
  27. // Buffered channel of outbound messages.
  28. send chan []byte
  29. callbacks map[string]chan *models.AgentData
  30. }
  31. // readPump pumps messages from the websocket connection to the hub.
  32. //
  33. // The application runs readPump in a per-connection goroutine. The application
  34. // ensures that there is at most one reader on a connection by executing all
  35. // reads from this goroutine.
  36. func (c *WsClient) readPump() {
  37. defer func() {
  38. c.hub.unregister <- c
  39. _ = c.conn.Close()
  40. log.Printf("readPump closed")
  41. }()
  42. c.conn.SetReadLimit(maxMessageSize)
  43. _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
  44. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  45. c.conn.SetPingHandler(func(string) error {
  46. log.Printf("ping: %v", time.Now())
  47. _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
  48. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  49. _ = c.conn.WriteMessage(websocket.PongMessage, []byte{})
  50. return nil
  51. })
  52. for {
  53. agentData := &models.AgentData{}
  54. err := c.conn.ReadJSON(&agentData)
  55. log.Printf("recv: %v", err)
  56. if err != nil {
  57. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  58. log.Printf("error: %v", err)
  59. }
  60. log.Printf("read message: %v", err)
  61. break
  62. }
  63. r, ok := c.callbacks[agentData.RequestId]
  64. if ok {
  65. r <- agentData
  66. continue
  67. }
  68. handler, ok := c.hub.handlers[agentData.Type]
  69. if ok {
  70. go func() {
  71. result, err := handler(c, agentData)
  72. log.Printf("result: %v,err: %v", result, err)
  73. if err != nil {
  74. agentData.Success = false
  75. agentData.Data = make(map[string]interface{})
  76. agentData.Msg = err.Error()
  77. } else {
  78. agentData.Success = true
  79. agentData.Data = result
  80. agentData.Msg = ""
  81. }
  82. c.JustSend(agentData)
  83. }()
  84. }
  85. }
  86. }
  87. /*
  88. Send 发送消息
  89. 参数:
  90. message - 消息体
  91. result - 返回结构体指针
  92. timeout - 超时时间
  93. 返回值:
  94. error - 错误
  95. */
  96. func (c *WsClient) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) {
  97. var rec = make(chan *models.AgentData)
  98. if data.RequestId == "" {
  99. id, _ := uuid.GenerateUUID()
  100. data.RequestId = id
  101. }
  102. c.callbacks[data.RequestId] = rec
  103. timer := time.NewTimer(timeout)
  104. defer func() {
  105. delete(c.callbacks, data.RequestId)
  106. timer.Stop()
  107. }()
  108. err := c.conn.WriteJSON(data)
  109. if err != nil {
  110. return nil, err
  111. }
  112. for {
  113. select {
  114. case res, ok := <-rec:
  115. if !ok {
  116. return nil, errors.New("receive data fail")
  117. }
  118. return res, nil
  119. case <-timer.C:
  120. return nil, errors.New("timeout")
  121. }
  122. }
  123. }
  124. // JustSend 仅发送,不管是否成功,用于消息回复
  125. func (c *WsClient) JustSend(message interface{}) {
  126. err := c.conn.WriteJSON(message)
  127. log.Printf("JustSend: message: %s, %v", message, err)
  128. }