websocket_manager.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package service
  2. import (
  3. "dashoo.cn/micro/app/model"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/frame/g"
  7. "github.com/gogf/gf/net/ghttp"
  8. "github.com/gorilla/websocket"
  9. "net/http"
  10. "sync"
  11. "time"
  12. )
  13. // 管理连接
  14. func CheckRun() {
  15. // 检查心跳
  16. go func() {
  17. defer func() {
  18. if r := recover(); r != nil {
  19. g.Log("websocket").Println(r)
  20. }
  21. }()
  22. heartbeat()
  23. }()
  24. // 注册注销
  25. go func() {
  26. defer func() {
  27. if r := recover(); r != nil {
  28. g.Log("websocket").Println(r)
  29. }
  30. }()
  31. register()
  32. }()
  33. }
  34. // 客户端连接信息
  35. type Client struct {
  36. ID string // 连接ID
  37. AccountId string // 账号id, 一个账号可能有多个连接
  38. Socket *ghttp.WebSocket // 连接
  39. HeartbeatTime int64 // 前一次心跳时间
  40. }
  41. // 消息类型
  42. const (
  43. MessageTypeHeartbeat = "heartbeat" // 心跳
  44. MessageTypeRegister = "register" // 注册
  45. HeartbeatCheckTime = 9 // 心跳检测几秒检测一次
  46. HeartbeatTime = 20 // 心跳距离上一次的最大时间
  47. ChanBufferRegister = 100 // 注册chan缓冲
  48. ChanBufferUnregister = 100 // 注销chan大小
  49. )
  50. // 客户端管理
  51. type ClientManager struct {
  52. Clients map[string]*Client // 保存连接
  53. //Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接 // 本系统无多连接情况
  54. mu *sync.Mutex
  55. }
  56. // 定义一个管理Manager
  57. var Manager = ClientManager{
  58. Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
  59. //Accounts: make(map[string][]string), // 账号和连接关系 // 本系统无多连接情况
  60. mu: new(sync.Mutex),
  61. }
  62. var (
  63. RegisterChan = make(chan *Client, ChanBufferRegister) // 注册
  64. unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
  65. )
  66. // 封装回复消息
  67. type ServiceMessage struct {
  68. Type string `json:"type"` // 类型
  69. Content ServiceMessageContent `json:"content"`
  70. }
  71. // Content
  72. type ServiceMessageContent struct {
  73. Body interface{} `json:"body"` // 主要数据
  74. MetaData interface{} `json:"meta_data"` // 扩展数据
  75. }
  76. // 创建消息
  77. func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
  78. replyMsg := ServiceMessage{
  79. Type: t,
  80. Content: content,
  81. }
  82. msg, _ := json.Marshal(replyMsg)
  83. return msg
  84. }
  85. // 注册注销
  86. func register() {
  87. for {
  88. select {
  89. case conn := <-RegisterChan: // 新注册,新连接
  90. // 加入连接,进行管理
  91. accountBind(conn)
  92. // 回复消息
  93. content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{})
  94. _ = conn.Socket.WriteMessage(websocket.TextMessage, content)
  95. case conn := <-unregisterChan: // 注销,或者没有心跳
  96. // 关闭连接
  97. _ = conn.Socket.Close()
  98. // 删除Client
  99. accountUnBind(conn)
  100. }
  101. }
  102. }
  103. // 绑定账号
  104. func accountBind(c *Client) {
  105. Manager.mu.Lock()
  106. // 如果不存在链接,加入到连接;若已存在,则替换
  107. Manager.Clients[c.ID] = c
  108. Manager.mu.Unlock()
  109. }
  110. // 解绑账号
  111. func accountUnBind(c *Client) {
  112. Manager.mu.Lock()
  113. // 取消连接
  114. if err := c.Socket.Close(); err != nil {
  115. g.Log("websocket").Error(err)
  116. }
  117. delete(Manager.Clients, c.ID)
  118. Manager.mu.Unlock()
  119. }
  120. // 维持心跳
  121. func heartbeat() {
  122. for {
  123. // 获取所有的Clients
  124. Manager.mu.Lock()
  125. var clients []*Client
  126. for _, c := range Manager.Clients {
  127. clients = append(clients, c)
  128. }
  129. Manager.mu.Unlock()
  130. g.Log("websocket").Info("开始本次心跳:")
  131. for _, c := range clients {
  132. g.Log("websocket").Info(c.ID)
  133. if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
  134. accountUnBind(c)
  135. }
  136. }
  137. g.Log("websocket").Info("结束本次心跳:")
  138. time.Sleep(time.Second * HeartbeatCheckTime)
  139. }
  140. }
  141. // 根据账号获取连接
  142. func GetClient(uuid string) *Client {
  143. Manager.mu.Lock()
  144. if c, ok := Manager.Clients[uuid]; ok {
  145. Manager.mu.Unlock()
  146. return c
  147. }
  148. Manager.mu.Unlock()
  149. return nil
  150. }
  151. // 读取信息,即收到消息 TODO 服务端读取客户的返回消息,并维持心跳
  152. func (c *Client) Read() {
  153. defer func() {
  154. g.Log("websocket").Info("历史数据关闭")
  155. _ = c.Socket.Close()
  156. }()
  157. for {
  158. // 读取消息
  159. _, body, err := c.Socket.ReadMessage()
  160. if err != nil {
  161. break
  162. }
  163. var msg struct {
  164. Type string `json:"type"`
  165. }
  166. err = json.Unmarshal(body, &msg)
  167. if err != nil {
  168. g.Log("websocket").Error(err)
  169. continue
  170. }
  171. if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
  172. // 刷新连接时间
  173. c.HeartbeatTime = time.Now().Unix()
  174. // 回复心跳
  175. replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
  176. err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
  177. if err != nil {
  178. g.Log("websocket").Error(err)
  179. }
  180. continue
  181. }
  182. }
  183. }
  184. // 发送消息 TODO 服务端向客户端发送消息
  185. func Send(uuids []string, message ServiceMessage) error {
  186. msg, err := json.Marshal(message)
  187. if err != nil {
  188. return err
  189. }
  190. for _, uuid := range uuids {
  191. // 获取连接id
  192. client := GetClient(uuid)
  193. if client != nil {
  194. _ = client.Socket.WriteMessage(websocket.TextMessage, msg)
  195. }
  196. }
  197. return nil
  198. }
  199. func MessageNotify(uuid string, data model.SysMessage) {
  200. _, ok := Manager.Clients[uuid]
  201. if !ok { // 无匹配数据,直接报错
  202. g.Log("websocket").Error(fmt.Sprintf("uuid:%v 无匹配连接", uuid))
  203. return
  204. }
  205. // 发送消息
  206. go func() {
  207. var msg ServiceMessage
  208. msg.Type = "SendMessage"
  209. msg.Content = ServiceMessageContent{
  210. Body: data,
  211. }
  212. err := Send([]string{uuid}, msg)
  213. if err != nil {
  214. g.Log("websocket").Error(err)
  215. }
  216. }()
  217. }
  218. func CreateConnection(uuid string, r *ghttp.Request) {
  219. // 将http升级为websocket
  220. conn, err := r.WebSocket()
  221. if err != nil {
  222. g.Log("websocket").Error(err)
  223. http.NotFound(r.Response.Writer, r.Request)
  224. return
  225. }
  226. // 创建一个实例连接
  227. client := &Client{
  228. ID: uuid, // 连接id
  229. AccountId: uuid,
  230. HeartbeatTime: time.Now().Unix(),
  231. Socket: conn,
  232. }
  233. // 用户注册到用户连接管理
  234. RegisterChan <- client
  235. // 发起读取心跳消息
  236. go func() {
  237. defer func() {
  238. if r := recover(); r != nil {
  239. g.Log("websocket").Printf("MessageNotify read panic: %+v\n", r)
  240. }
  241. }()
  242. client.Read()
  243. }()
  244. }