package agent import ( "context" "errors" "fmt" "github.com/gorilla/websocket" "log" "nginx-ui/server/constants" "nginx-ui/server/models" "nginx-ui/server/utils" "sync" "time" ) const ( // Time allowed to write a message to the peer. writeWait = 12 * time.Second readWait = 12 * time.Second // Send pings to peer with this period. Must be less than pongWait. // 心跳必须要小于writeWait ,且每次发送心跳都需要重新设置,否则会导致连接断开 pingPeriod = 5 * time.Second // Maximum message size allowed from peer. maxMessageSize = 1024 ) var mutex = &sync.Mutex{} // Agent 客户端的代码 // - SSL Y or N type Agent struct { Url string `json:"url"` Token string `json:"token"` SSL string `json:"ssl"` dialer *websocket.Dialer ReconnectInterval time.Duration Ctx context.Context Cancel context.CancelFunc conn *websocket.Conn errChan chan error callbacks map[string]chan *models.AgentData handlers map[string]func(agent *Agent, message *models.AgentData) (interface{}, error) Nginx *models.Nginx } func NewAgent(url string, ssl string, token string) *Agent { ctx, cancel := context.WithCancel(context.Background()) return &Agent{ Url: url, Token: token, SSL: ssl, dialer: &websocket.Dialer{ReadBufferSize: 1024, WriteBufferSize: 1024}, ReconnectInterval: 10 * time.Second, Ctx: ctx, Cancel: cancel, errChan: make(chan error, 3), callbacks: make(map[string]chan *models.AgentData), handlers: make(map[string]func(agent *Agent, message *models.AgentData) (interface{}, error)), } } func (a *Agent) SetMessageHandler(key string, handler func(agent *Agent, message *models.AgentData) (interface{}, error)) { a.handlers[key] = handler } func (a *Agent) Reset() { mutex.Lock() defer mutex.Unlock() for k, _ := range a.callbacks { delete(a.callbacks, k) } a.Cancel() // 通知协程退出 log.Printf("reset") } func (a *Agent) setCallback(key string, c chan *models.AgentData) { mutex.Lock() defer mutex.Unlock() a.callbacks[key] = c } func (a *Agent) removeCallback(key string) { mutex.Lock() defer mutex.Unlock() delete(a.callbacks, key) } func (a *Agent) getCallback(key string) chan *models.AgentData { mutex.Lock() defer mutex.Unlock() ret, ok := a.callbacks[key] if ok { return ret } return nil } func (a *Agent) read() { log.Printf("read start") defer func() { log.Printf("read closed") }() for { select { case <-a.Ctx.Done(): return default: data := &models.AgentData{} err := a.conn.ReadJSON(data) if err != nil { log.Printf("parse receive message fail: %v", err) a.errChan <- err break } log.Printf("agent: recv: %v", data) callback := a.getCallback(data.RequestId) if callback != nil { callback <- data continue } handler, ok := a.handlers[data.Type] if !ok { log.Printf("message type: %v has not handler", data.Type) continue } go func() { result, err := handler(a, data) log.Printf("agent: handler result: %v, err: %v", result, err) if err != nil { data.Data = make([]byte, 0) data.Success = false data.Msg = err.Error() } else { data.Data = result data.Success = true data.Msg = "" } a.JustSend(data) }() } } } func (a *Agent) ping() { log.Printf("ping start") ticker := time.NewTicker(pingPeriod) defer ticker.Stop() for { select { case <-a.Ctx.Done(): return case <-ticker.C: if a.conn == nil { continue } _ = a.conn.SetWriteDeadline(time.Now().Add(writeWait)) err := a.conn.WriteMessage(websocket.PingMessage, nil) log.Printf("ping: %v", err) if err != nil { a.errChan <- err return } } } } func (a *Agent) Run() error { protocol := "ws" if a.SSL == "Y" { protocol = "wss" } url := fmt.Sprintf("%v://%v%v", protocol, a.Url, constants.AgentConnectUrl) for { conn, _, err := a.dialer.Dial(url, map[string][]string{"token": {a.Token}}) if err != nil { log.Printf("agent: failed to connect to %s: %s\n", a.Url, err) time.Sleep(a.ReconnectInterval) continue } log.Printf("agent: connected to %s\n", a.Url) conn.SetReadLimit(maxMessageSize) _ = conn.SetReadDeadline(time.Now().Add(readWait)) conn.SetPongHandler(func(string) error { log.Printf("pong") _ = conn.SetReadDeadline(time.Now().Add(readWait)) return nil }) a.conn = conn go a.ping() go a.read() handler := a.handlers[models.ServerConnected] if handler != nil { go func() { _, _ = handler(a, &models.AgentData{ Type: models.ServerConnected, }) }() } log.Printf("agent: waiting for closed") // 等待错误或连接关闭 select { case err := <-a.errChan: log.Println("连接异常:", err) a.Reset() conn.Close() // 关闭当前连接 log.Println("连接已关闭,5秒后重连...") time.Sleep(a.ReconnectInterval) } } } /* Send 发送消息 参数: message - 消息体 result - 返回结构体指针 timeout - 超时时间 返回值: error - 错误 */ func (a *Agent) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) { var rec = make(chan *models.AgentData) if len(data.RequestId) == 0 { data.RequestId = utils.GenerateUUID() } requestId := data.RequestId a.setCallback(requestId, rec) timer := time.NewTimer(timeout) defer func() { timer.Stop() a.removeCallback(requestId) log.Printf("release send chan") close(rec) }() err := a.conn.WriteJSON(data) if err != nil { log.Printf("write message fail: %v", err) return nil, err } log.Printf("Send: %v", data) for { select { case <-a.Ctx.Done(): return nil, errors.New("timeout") case res, ok := <-rec: if !ok { return nil, errors.New("receive data fail") } return res, err case <-timer.C: return nil, errors.New("timeout") } } } // JustSend 仅发送,不管是否成功,用于消息回复 func (a *Agent) JustSend(message interface{}) { err := a.conn.WriteJSON(message) log.Printf("JustSend: %v", err) }