hub.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package agent_server
  2. import (
  3. "nginx-ui/server/models"
  4. )
  5. // Hub maintains the set of active clients and broadcasts messages to the
  6. // clients.
  7. type Hub struct {
  8. // Registered clients.
  9. clientMap map[string]*WsClient
  10. // Inbound messages from the clients.
  11. broadcast chan []byte
  12. // Register requests from the clients.
  13. register chan *WsClient
  14. // Unregister requests from clients.
  15. unregister chan *WsClient
  16. handlers map[string]func(c *WsClient, message *models.AgentData) (interface{}, error)
  17. }
  18. func newHub() *Hub {
  19. return &Hub{
  20. broadcast: make(chan []byte),
  21. register: make(chan *WsClient),
  22. clientMap: make(map[string]*WsClient),
  23. handlers: make(map[string]func(c *WsClient, message *models.AgentData) (interface{}, error)),
  24. }
  25. }
  26. func (h *Hub) SetMessageHandler(key string, handler func(c *WsClient, message *models.AgentData) (interface{}, error)) {
  27. h.handlers[key] = handler
  28. }
  29. func (h *Hub) FindClient(key string) (*WsClient, bool) {
  30. c, ok := h.clientMap[key]
  31. return c, ok
  32. }
  33. func (h *Hub) run() {
  34. for {
  35. select {
  36. case client := <-h.register:
  37. old, ok := h.clientMap[client.Token]
  38. if ok {
  39. old.Close()
  40. }
  41. h.clientMap[client.Token] = client
  42. case client := <-h.unregister:
  43. if _, ok := h.clientMap[client.Token]; ok {
  44. delete(h.clientMap, client.Token)
  45. }
  46. client.Close()
  47. delete(h.clientMap, client.Token)
  48. case message := <-h.broadcast:
  49. for _, client := range h.clientMap {
  50. select {
  51. case client.send <- message:
  52. default:
  53. close(client.send)
  54. delete(h.clientMap, client.Token)
  55. }
  56. }
  57. }
  58. }
  59. }
  60. var AgentHub = newHub()