|
@@ -26,70 +26,58 @@ const (
|
|
|
maxMessageSize = 1024
|
|
|
)
|
|
|
|
|
|
-var mutex = &sync.Mutex{}
|
|
|
+// 消息写锁,不支持并发写消息
|
|
|
+var writeLock = &sync.Mutex{}
|
|
|
|
|
|
// Agent 客户端的代码
|
|
|
// - SSL Y or N
|
|
|
type Agent struct {
|
|
|
- Url string `json:"url"`
|
|
|
- Token string `json:"token"`
|
|
|
- SSL string `json:"ssl"`
|
|
|
- dialer *websocket.Dialer
|
|
|
- ReconnectInterval time.Duration
|
|
|
- Ctx context.Context
|
|
|
- Cancel context.CancelFunc
|
|
|
- conn *websocket.Conn
|
|
|
- errChan chan error
|
|
|
- callbacks map[string]chan *models.AgentData
|
|
|
- handlers map[string]func(agent *Agent, message *models.AgentData) (interface{}, error)
|
|
|
- Nginx *models.Nginx
|
|
|
+ Url string `json:"url"`
|
|
|
+ Token string `json:"token"`
|
|
|
+ SSL string `json:"ssl"`
|
|
|
+ Nginx *models.Nginx
|
|
|
+ Connected bool
|
|
|
+ callbacks map[string]chan *models.AgentData
|
|
|
+ handlers map[string]func(agent *Agent, message *models.AgentData) (interface{}, error)
|
|
|
+ dialer *websocket.Dialer
|
|
|
+ Ctx context.Context
|
|
|
+ Cancel context.CancelFunc
|
|
|
+ conn *websocket.Conn
|
|
|
+ errChan chan error
|
|
|
}
|
|
|
|
|
|
func NewAgent(url string, ssl string, token string) *Agent {
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
return &Agent{
|
|
|
- Url: url,
|
|
|
- Token: token,
|
|
|
- SSL: ssl,
|
|
|
- dialer: &websocket.Dialer{ReadBufferSize: 1024, WriteBufferSize: 1024},
|
|
|
- ReconnectInterval: 10 * time.Second,
|
|
|
- Ctx: ctx,
|
|
|
- Cancel: cancel,
|
|
|
- errChan: make(chan error, 3),
|
|
|
- callbacks: make(map[string]chan *models.AgentData),
|
|
|
- handlers: make(map[string]func(agent *Agent, message *models.AgentData) (interface{}, error)),
|
|
|
+ Url: url,
|
|
|
+ Token: token,
|
|
|
+ SSL: ssl,
|
|
|
+ callbacks: make(map[string]chan *models.AgentData),
|
|
|
+ handlers: make(map[string]func(agent *Agent, message *models.AgentData) (interface{}, error)),
|
|
|
+ dialer: &websocket.Dialer{ReadBufferSize: 1024, WriteBufferSize: 1024},
|
|
|
+ Ctx: ctx,
|
|
|
+ Cancel: cancel,
|
|
|
+ errChan: make(chan error, 3),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (a *Agent) SetMessageHandler(key string, handler func(agent *Agent, message *models.AgentData) (interface{}, error)) {
|
|
|
- a.handlers[key] = handler
|
|
|
-}
|
|
|
-
|
|
|
func (a *Agent) Reset() {
|
|
|
- mutex.Lock()
|
|
|
- defer mutex.Unlock()
|
|
|
- for k, _ := range a.callbacks {
|
|
|
- delete(a.callbacks, k)
|
|
|
- }
|
|
|
+ a.Connected = false
|
|
|
a.Cancel() // 通知协程退出
|
|
|
+ if a.conn != nil {
|
|
|
+ _ = a.conn.Close()
|
|
|
+ }
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ a.Ctx = ctx
|
|
|
+ a.Cancel = cancel
|
|
|
log.Printf("reset")
|
|
|
}
|
|
|
|
|
|
-func (a *Agent) setCallback(key string, c chan *models.AgentData) {
|
|
|
- mutex.Lock()
|
|
|
- defer mutex.Unlock()
|
|
|
- a.callbacks[key] = c
|
|
|
-}
|
|
|
-
|
|
|
-func (a *Agent) removeCallback(key string) {
|
|
|
- mutex.Lock()
|
|
|
- defer mutex.Unlock()
|
|
|
- delete(a.callbacks, key)
|
|
|
+func (a *Agent) SetMessageHandler(key string, handler func(agent *Agent, message *models.AgentData) (interface{}, error)) {
|
|
|
+ a.handlers[key] = handler
|
|
|
}
|
|
|
|
|
|
func (a *Agent) getCallback(key string) chan *models.AgentData {
|
|
|
- mutex.Lock()
|
|
|
- defer mutex.Unlock()
|
|
|
ret, ok := a.callbacks[key]
|
|
|
if ok {
|
|
|
return ret
|
|
@@ -145,8 +133,12 @@ func (a *Agent) read() {
|
|
|
|
|
|
func (a *Agent) ping() {
|
|
|
log.Printf("ping start")
|
|
|
+ _ = a.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
ticker := time.NewTicker(pingPeriod)
|
|
|
- defer ticker.Stop()
|
|
|
+ defer func() {
|
|
|
+ ticker.Stop()
|
|
|
+ log.Printf("ping closed")
|
|
|
+ }()
|
|
|
for {
|
|
|
select {
|
|
|
case <-a.Ctx.Done():
|
|
@@ -172,45 +164,42 @@ func (a *Agent) Run() error {
|
|
|
protocol = "wss"
|
|
|
}
|
|
|
url := fmt.Sprintf("%v://%v%v", protocol, a.Url, constants.AgentConnectUrl)
|
|
|
- for {
|
|
|
- conn, _, err := a.dialer.Dial(url, map[string][]string{"token": {a.Token}})
|
|
|
- if err != nil {
|
|
|
- log.Printf("agent: failed to connect to %s: %s\n", a.Url, err)
|
|
|
- time.Sleep(a.ReconnectInterval)
|
|
|
- continue
|
|
|
- }
|
|
|
- log.Printf("agent: connected to %s\n", a.Url)
|
|
|
+ conn, _, err := a.dialer.Dial(url, map[string][]string{"token": {a.Token}})
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("agent: failed to connect to %s: %s\n", a.Url, err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ log.Printf("agent: connected to %s\n", a.Url)
|
|
|
|
|
|
- conn.SetReadLimit(maxMessageSize)
|
|
|
+ conn.SetReadLimit(maxMessageSize)
|
|
|
+ _ = conn.SetReadDeadline(time.Now().Add(readWait))
|
|
|
+ conn.SetPongHandler(func(string) error {
|
|
|
+ log.Printf("pong")
|
|
|
_ = conn.SetReadDeadline(time.Now().Add(readWait))
|
|
|
- conn.SetPongHandler(func(string) error {
|
|
|
- log.Printf("pong")
|
|
|
- _ = conn.SetReadDeadline(time.Now().Add(readWait))
|
|
|
- return nil
|
|
|
- })
|
|
|
- a.conn = conn
|
|
|
- go a.ping()
|
|
|
- go a.read()
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ a.conn = conn
|
|
|
+ go a.ping()
|
|
|
+ go a.read()
|
|
|
+ a.Connected = true
|
|
|
|
|
|
- handler := a.handlers[models.ServerConnected]
|
|
|
- if handler != nil {
|
|
|
- go func() {
|
|
|
- _, _ = handler(a, &models.AgentData{
|
|
|
- Type: models.ServerConnected,
|
|
|
- })
|
|
|
- }()
|
|
|
- }
|
|
|
- log.Printf("agent: waiting for closed")
|
|
|
- // 等待错误或连接关闭
|
|
|
- select {
|
|
|
- case err := <-a.errChan:
|
|
|
- log.Println("连接异常:", err)
|
|
|
- a.Reset()
|
|
|
- conn.Close() // 关闭当前连接
|
|
|
- log.Println("连接已关闭,5秒后重连...")
|
|
|
- time.Sleep(a.ReconnectInterval)
|
|
|
- }
|
|
|
+ handler := a.handlers[models.ServerConnected]
|
|
|
+ if handler != nil {
|
|
|
+ go func() {
|
|
|
+ _, _ = handler(a, &models.AgentData{
|
|
|
+ Type: models.ServerConnected,
|
|
|
+ })
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ log.Printf("agent: waiting for closed")
|
|
|
+ // 等待错误或连接关闭
|
|
|
+ select {
|
|
|
+ case err := <-a.errChan:
|
|
|
+ log.Println("连接异常:", err)
|
|
|
+ a.Reset()
|
|
|
+ log.Println("连接已关闭,5秒后重连...")
|
|
|
}
|
|
|
+ return errors.New("closed")
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -226,20 +215,22 @@ Send 发送消息
|
|
|
error - 错误
|
|
|
*/
|
|
|
func (a *Agent) Send(data *models.AgentData, timeout time.Duration) (*models.AgentData, error) {
|
|
|
- var rec = make(chan *models.AgentData)
|
|
|
if len(data.RequestId) == 0 {
|
|
|
data.RequestId = utils.GenerateUUID()
|
|
|
}
|
|
|
requestId := data.RequestId
|
|
|
- a.setCallback(requestId, rec)
|
|
|
timer := time.NewTimer(timeout)
|
|
|
+ var rec = make(chan *models.AgentData, 5)
|
|
|
+ a.callbacks[requestId] = rec
|
|
|
defer func() {
|
|
|
timer.Stop()
|
|
|
- a.removeCallback(requestId)
|
|
|
- log.Printf("release send chan")
|
|
|
+ delete(a.callbacks, requestId)
|
|
|
close(rec)
|
|
|
+ log.Printf("release send chan")
|
|
|
}()
|
|
|
+ writeLock.Lock()
|
|
|
err := a.conn.WriteJSON(data)
|
|
|
+ writeLock.Unlock()
|
|
|
if err != nil {
|
|
|
log.Printf("write message fail: %v", err)
|
|
|
return nil, err
|
|
@@ -248,13 +239,12 @@ func (a *Agent) Send(data *models.AgentData, timeout time.Duration) (*models.Age
|
|
|
for {
|
|
|
select {
|
|
|
case <-a.Ctx.Done():
|
|
|
- return nil, errors.New("timeout")
|
|
|
+ return nil, errors.New("conn closed")
|
|
|
case res, ok := <-rec:
|
|
|
- if !ok {
|
|
|
- return nil, errors.New("receive data fail")
|
|
|
+ if ok {
|
|
|
+ return res, nil
|
|
|
}
|
|
|
- return res, err
|
|
|
-
|
|
|
+ return nil, errors.New("receive data fail")
|
|
|
case <-timer.C:
|
|
|
return nil, errors.New("timeout")
|
|
|
}
|
|
@@ -263,6 +253,8 @@ func (a *Agent) Send(data *models.AgentData, timeout time.Duration) (*models.Age
|
|
|
|
|
|
// JustSend 仅发送,不管是否成功,用于消息回复
|
|
|
func (a *Agent) JustSend(message interface{}) {
|
|
|
+ writeLock.Lock()
|
|
|
+ defer writeLock.Unlock()
|
|
|
err := a.conn.WriteJSON(message)
|
|
|
log.Printf("JustSend: %v", err)
|
|
|
}
|