package agent import ( "context" "errors" "fmt" "github.com/gorilla/websocket" "log" "nginx-ui/server/constants" "nginx-ui/server/models" "nginx-ui/server/utils" "sync" "time" ) const ( // Time allowed to write a message to the peer. writeWait = 12 * time.Second readWait = 12 * time.Second // Send pings to peer with this period. Must be less than pongWait. // 心跳必须要小于writeWait ,且每次发送心跳都需要重新设置,否则会导致连接断开 pingPeriod = 5 * time.Second // Maximum message size allowed from peer. maxMessageSize = 1024 ) // 消息写锁,不支持并发写消息 var writeLock = &sync.Mutex{} // Agent 客户端的代码 // - SSL Y or N type Agent struct { Url string `json:"url"` Token string `json:"token"` SSL string `json:"ssl"` Nginx *models.Nginx Connected bool callbacks map[string]chan *models.AgentData handlers map[string]func(agent *Agent, message *models.AgentData) (interface{}, error) dialer *websocket.Dialer Ctx context.Context Cancel context.CancelFunc conn *websocket.Conn errChan chan error } func NewAgent(url string, ssl string, token string) *Agent { ctx, cancel := context.WithCancel(context.Background()) return &Agent{ Url: url, Token: token, SSL: ssl, callbacks: make(map[string]chan *models.AgentData), handlers: make(map[string]func(agent *Agent, message *models.AgentData) (interface{}, error)), dialer: &websocket.Dialer{ReadBufferSize: 1024, WriteBufferSize: 1024}, Ctx: ctx, Cancel: cancel, errChan: make(chan error, 3), } } func (a *Agent) Reset() { a.Connected = false a.Cancel() // 通知协程退出 if a.conn != nil { _ = a.conn.Close() } ctx, cancel := context.WithCancel(context.Background()) a.Ctx = ctx a.Cancel = cancel log.Printf("reset") } func (a *Agent) SetMessageHandler(key string, handler func(agent *Agent, message *models.AgentData) (interface{}, error)) { a.handlers[key] = handler } func (a *Agent) getCallback(key string) chan *models.AgentData { ret, ok := a.callbacks[key] if ok { return ret } return nil } func (a *Agent) read() { log.Printf("read start") defer func() { log.Printf("read closed") }() for { select { case <-a.Ctx.Done(): return default: data := &models.AgentData{} err := a.conn.ReadJSON(data) if err != nil { log.Printf("parse receive message fail: %v", err) a.errChan <- err break } log.Printf("agent: recv: %v", data) callback := a.getCallback(data.RequestId) if callback != nil { callback <- data continue } handler, ok := a.handlers[data.Type] if !ok { log.Printf("message type: %v has not handler", data.Type) continue } go func() { result, err := handler(a, data) log.Printf("agent: handler result: %v, err: %v", result, err) if err != nil { data.Data = make([]byte, 0) data.Success = false data.Msg = err.Error() } else { data.Data = result data.Success = true data.Msg = "" } a.JustSend(data) }() } } } func (a *Agent) ping() { log.Printf("ping start") _ = a.conn.SetWriteDeadline(time.Now().Add(writeWait)) ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() log.Printf("ping closed") }() for { select { case <-a.Ctx.Done(): return case <-ticker.C: if a.conn == nil { continue } _ = a.conn.SetWriteDeadline(time.Now().Add(writeWait)) err := a.conn.WriteMessage(websocket.PingMessage, nil) log.Printf("ping: %v", err) if err != nil { a.errChan <- err return } } } } func (a *Agent) Run() error { protocol := "ws" if a.SSL == "Y" { protocol = "wss" } url := fmt.Sprintf("%v://%v%v", protocol, a.Url, constants.AgentConnectUrl) conn, _, err := a.dialer.Dial(url, map[string][]string{"token": {a.Token}}) if err != nil { log.Printf("agent: failed to connect to %s: %s\n", a.Url, err) return err } log.Printf("agent: connected to %s\n", a.Url) conn.SetReadLimit(maxMessageSize) _ = conn.SetReadDeadline(time.Now().Add(readWait)) conn.SetPongHandler(func(string) error { log.Printf("pong") _ = conn.SetReadDeadline(time.Now().Add(readWait)) return nil }) a.conn = conn go a.ping() go a.read() a.Connected = true handler := a.handlers[models.ServerConnected] if handler != nil { go func() { _, _ = handler(a, &models.AgentData{ Type: models.ServerConnected, }) }() } log.Printf("agent: waiting for closed") // 等待错误或连接关闭 select { case err := <-a.errChan: log.Println("连接异常:", err) a.Reset() log.Println("连接已关闭,5秒后重连...") } return errors.New("closed") } /* Send 发送消息 参数: message - 消息体 result - 返回结构体指针 timeout - 超时时间 返回值: error - 错误 */ func (a *Agent) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) { if len(data.RequestId) == 0 { data.RequestId = utils.GenerateUUID() } requestId := data.RequestId timer := time.NewTimer(timeout) var rec = make(chan *models.AgentData, 5) a.callbacks[requestId] = rec defer func() { timer.Stop() delete(a.callbacks, requestId) close(rec) log.Printf("release send chan") }() writeLock.Lock() err := a.conn.WriteJSON(data) writeLock.Unlock() if err != nil { log.Printf("write message fail: %v", err) return nil, err } log.Printf("Send: %v", data) for { select { case <-a.Ctx.Done(): return nil, errors.New("conn closed") case res, ok := <-rec: if ok { return res, nil } return nil, errors.New("receive data fail") case <-timer.C: return nil, errors.New("timeout") } } } // JustSend 仅发送,不管是否成功,用于消息回复 func (a *Agent) JustSend(message interface{}) { writeLock.Lock() defer writeLock.Unlock() err := a.conn.WriteJSON(message) log.Printf("JustSend: %v", err) }