websocket_manager.go 6.8 KB


  1. package service
  2. import (
  3. "dashoo.cn/micro/app/model"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/container/gset"
  7. "github.com/gogf/gf/frame/g"
  8. "github.com/gogf/gf/net/ghttp"
  9. "github.com/gorilla/websocket"
  10. "net/http"
  11. "sync"
  12. "time"
  13. )
  14. // 管理连接
  15. func CheckRun() {
  16. // 检查心跳
  17. go func() {
  18. defer func() {
  19. if r := recover(); r != nil {
  20. g.Log("websocket").Println(r)
  21. }
  22. }()
  23. heartbeat()
  24. }()
  25. // 注册注销
  26. go func() {
  27. defer func() {
  28. if r := recover(); r != nil {
  29. g.Log("websocket").Println(r)
  30. }
  31. }()
  32. register()
  33. }()
  34. }
  35. // 客户端连接信息
  36. type Client struct {
  37. ID string // 连接ID
  38. AccountId string // 账号id, 一个账号可能有多个连接
  39. Socket *ghttp.WebSocket // 连接
  40. HeartbeatTime int64 // 前一次心跳时间
  41. }
  42. // 消息类型
  43. const (
  44. MessageTypeHeartbeat = "heartbeat" // 心跳
  45. MessageTypeRegister = "register" // 注册
  46. HeartbeatCheckTime = 9 // 心跳检测几秒检测一次
  47. HeartbeatTime = 20 // 心跳距离上一次的最大时间
  48. ChanBufferRegister = 100 // 注册chan缓冲
  49. ChanBufferUnregister = 100 // 注销chan大小
  50. )
  51. // 客户端管理
  52. type ClientManager struct {
  53. Clients map[string]*Client // 保存连接
  54. Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接 // 本系统无多连接情况
  55. mu *sync.Mutex
  56. }
  57. // 定义一个管理Manager
  58. var Manager = ClientManager{
  59. Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
  60. Accounts: make(map[string][]string), // 账号和连接关系 // 本系统无多连接情况
  61. mu: new(sync.Mutex),
  62. }
  63. var (
  64. RegisterChan = make(chan *Client, ChanBufferRegister) // 注册
  65. unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
  66. )
  67. // 封装回复消息
  68. type ServiceMessage struct {
  69. Type string `json:"type"` // 类型
  70. Content ServiceMessageContent `json:"content"`
  71. }
  72. // Content
  73. type ServiceMessageContent struct {
  74. Body interface{} `json:"body"` // 主要数据
  75. MetaData interface{} `json:"meta_data"` // 扩展数据
  76. }
  77. // 创建消息
  78. func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
  79. replyMsg := ServiceMessage{
  80. Type: t,
  81. Content: content,
  82. }
  83. msg, _ := json.Marshal(replyMsg)
  84. return msg
  85. }
  86. // 注册注销
  87. func register() {
  88. for {
  89. select {
  90. case conn := <-RegisterChan: // 新注册,新连接
  91. // 加入连接,进行管理
  92. accountBind(conn)
  93. // 回复消息
  94. content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{})
  95. _ = conn.Socket.WriteMessage(websocket.TextMessage, content)
  96. case conn := <-unregisterChan: // 注销,或者没有心跳
  97. // 关闭连接
  98. _ = conn.Socket.Close()
  99. // 删除Client
  100. accountUnBind(conn)
  101. }
  102. }
  103. }
  104. // 绑定账号
  105. func accountBind(c *Client) {
  106. Manager.mu.Lock()
  107. // 如果不存在链接,加入到连接;若已存在,则替换
  108. Manager.Clients[c.ID] = c
  109. if !gset.NewStrSetFrom(Manager.Accounts[c.AccountId]).Contains(c.ID) {
  110. Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId], c.ID)
  111. }
  112. Manager.mu.Unlock()
  113. }
  114. // 解绑账号
  115. func accountUnBind(c *Client) {
  116. Manager.mu.Lock()
  117. // 取消连接
  118. if err := c.Socket.Close(); err != nil {
  119. g.Log("websocket").Error(err)
  120. }
  121. delete(Manager.Clients, c.ID)
  122. accountSet := gset.NewStrSetFrom(Manager.Accounts[c.AccountId])
  123. if accountSet.Contains(c.ID) {
  124. accountSet.Remove(c.ID)
  125. Manager.Accounts[c.AccountId] = accountSet.Slice()
  126. }
  127. Manager.mu.Unlock()
  128. }
  129. // 维持心跳
  130. func heartbeat() {
  131. for {
  132. // 获取所有的Clients
  133. Manager.mu.Lock()
  134. var clients []*Client
  135. for _, c := range Manager.Clients {
  136. clients = append(clients, c)
  137. }
  138. Manager.mu.Unlock()
  139. g.Log("websocket").Info("开始本次心跳:")
  140. for _, c := range clients {
  141. g.Log("websocket").Info(c.ID)
  142. if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
  143. accountUnBind(c)
  144. }
  145. }
  146. g.Log("websocket").Info("结束本次心跳:")
  147. time.Sleep(time.Second * HeartbeatCheckTime)
  148. }
  149. }
  150. // 根据账号获取连接
  151. func GetClient(userId string) *Client {
  152. Manager.mu.Lock()
  153. if c, ok := Manager.Clients[userId]; ok {
  154. Manager.mu.Unlock()
  155. return c
  156. }
  157. Manager.mu.Unlock()
  158. return nil
  159. }
  160. func GetAccountClient(userId string) []string {
  161. Manager.mu.Lock()
  162. if c, ok := Manager.Accounts[userId]; ok {
  163. Manager.mu.Unlock()
  164. return c
  165. }
  166. Manager.mu.Unlock()
  167. return nil
  168. }
  169. // 读取信息,即收到消息 TODO 服务端读取客户的返回消息,并维持心跳
  170. func (c *Client) Read() {
  171. defer func() {
  172. g.Log("websocket").Info("历史数据关闭")
  173. _ = c.Socket.Close()
  174. }()
  175. for {
  176. // 读取消息
  177. _, body, err := c.Socket.ReadMessage()
  178. if err != nil {
  179. break
  180. }
  181. var msg struct {
  182. Type string `json:"type"`
  183. }
  184. err = json.Unmarshal(body, &msg)
  185. if err != nil {
  186. g.Log("websocket").Error(err)
  187. continue
  188. }
  189. if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
  190. // 刷新连接时间
  191. c.HeartbeatTime = time.Now().Unix()
  192. // 回复心跳
  193. replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
  194. err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
  195. if err != nil {
  196. g.Log("websocket").Error(err)
  197. }
  198. continue
  199. }
  200. }
  201. }
  202. // 发送消息 TODO 服务端向客户端发送消息
  203. func Send(userIds []string, message ServiceMessage) error {
  204. msg, err := json.Marshal(message)
  205. if err != nil {
  206. return err
  207. }
  208. for _, userId := range userIds {
  209. links := GetAccountClient(userId)
  210. for _, link := range links {
  211. // 获取连接id
  212. client := GetClient(link)
  213. if client != nil {
  214. _ = client.Socket.WriteMessage(websocket.TextMessage, msg)
  215. }
  216. }
  217. }
  218. return nil
  219. }
  220. func MessageNotify(userId string, data model.SysMessage) {
  221. arr, ok := Manager.Accounts[userId]
  222. if !ok || len(arr) == 0 { // 无匹配数据,直接报错
  223. g.Log("websocket").Error(fmt.Sprintf("用户ID:%v 无匹配连接", userId))
  224. return
  225. }
  226. // 发送消息
  227. go func() {
  228. var msg ServiceMessage
  229. msg.Type = "SendMessage"
  230. msg.Content = ServiceMessageContent{
  231. Body: data,
  232. }
  233. err := Send([]string{userId}, msg)
  234. if err != nil {
  235. g.Log("websocket").Error(err)
  236. }
  237. }()
  238. }
  239. func CreateConnection(accountId, link string, r *ghttp.Request) {
  240. // 将http升级为websocket
  241. conn, err := r.WebSocket()
  242. if err != nil {
  243. g.Log("websocket").Error(err)
  244. http.NotFound(r.Response.Writer, r.Request)
  245. return
  246. }
  247. // 创建一个实例连接
  248. client := &Client{
  249. ID: link, // 连接id
  250. AccountId: accountId,
  251. HeartbeatTime: time.Now().Unix(),
  252. Socket: conn,
  253. }
  254. // 用户注册到用户连接管理
  255. RegisterChan <- client
  256. // 发起读取心跳消息
  257. go func() {
  258. defer func() {
  259. if r := recover(); r != nil {
  260. g.Log("websocket").Printf("MessageNotify read panic: %+v\n", r)
  261. }
  262. }()
  263. client.Read()
  264. }()
  265. }