websocket_manager.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package service
  2. import (
  3. "dashoo.cn/micro/app/model"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/net/ghttp"
  7. "github.com/gogf/gf/os/glog"
  8. "github.com/gorilla/websocket"
  9. "log"
  10. "net/http"
  11. "sync"
  12. "time"
  13. )
  14. // 管理连接
  15. func CheckRun() {
  16. // 检查心跳
  17. go func() {
  18. defer func() {
  19. if r := recover(); r != nil {
  20. log.Println(r)
  21. }
  22. }()
  23. heartbeat()
  24. }()
  25. // 注册注销
  26. go func() {
  27. defer func() {
  28. if r := recover(); r != nil {
  29. log.Println(r)
  30. }
  31. }()
  32. register()
  33. }()
  34. }
  35. // 客户端连接信息
  36. type Client struct {
  37. ID string // 连接ID
  38. AccountId string // 账号id, 一个账号可能有多个连接
  39. Socket *ghttp.WebSocket // 连接
  40. HeartbeatTime int64 // 前一次心跳时间
  41. }
  42. // 消息类型
  43. const (
  44. MessageTypeHeartbeat = "heartbeat" // 心跳
  45. MessageTypeRegister = "register" // 注册
  46. HeartbeatCheckTime = 9 // 心跳检测几秒检测一次
  47. HeartbeatTime = 20 // 心跳距离上一次的最大时间
  48. ChanBufferRegister = 100 // 注册chan缓冲
  49. ChanBufferUnregister = 100 // 注销chan大小
  50. )
  51. // 客户端管理
  52. type ClientManager struct {
  53. Clients map[string]*Client // 保存连接
  54. //Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接 // 本系统无多连接情况
  55. mu *sync.Mutex
  56. }
  57. // 定义一个管理Manager
  58. var Manager = ClientManager{
  59. Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
  60. //Accounts: make(map[string][]string), // 账号和连接关系 // 本系统无多连接情况
  61. mu: new(sync.Mutex),
  62. }
  63. var (
  64. RegisterChan = make(chan *Client, ChanBufferRegister) // 注册
  65. unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
  66. )
  67. // 封装回复消息
  68. type ServiceMessage struct {
  69. Type string `json:"type"` // 类型
  70. Content ServiceMessageContent `json:"content"`
  71. }
  72. // Content
  73. type ServiceMessageContent struct {
  74. Body interface{} `json:"body"` // 主要数据
  75. MetaData interface{} `json:"meta_data"` // 扩展数据
  76. }
  77. // 创建消息
  78. func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
  79. replyMsg := ServiceMessage{
  80. Type: t,
  81. Content: content,
  82. }
  83. msg, _ := json.Marshal(replyMsg)
  84. return msg
  85. }
  86. // 注册注销
  87. func register() {
  88. for {
  89. select {
  90. case conn := <-RegisterChan: // 新注册,新连接
  91. // 加入连接,进行管理
  92. accountBind(conn)
  93. // 回复消息
  94. content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{})
  95. _ = conn.Socket.WriteMessage(websocket.TextMessage, content)
  96. case conn := <-unregisterChan: // 注销,或者没有心跳
  97. // 关闭连接
  98. _ = conn.Socket.Close()
  99. // 删除Client
  100. accountUnBind(conn)
  101. }
  102. }
  103. }
  104. // 绑定账号
  105. func accountBind(c *Client) {
  106. Manager.mu.Lock()
  107. // 如果不存在链接,加入到连接;若已存在,则替换
  108. Manager.Clients[c.ID] = c
  109. Manager.mu.Unlock()
  110. }
  111. // 解绑账号
  112. func accountUnBind(c *Client) {
  113. Manager.mu.Lock()
  114. // 取消连接
  115. if err := c.Socket.Close(); err != nil {
  116. glog.Error(err)
  117. }
  118. delete(Manager.Clients, c.ID)
  119. Manager.mu.Unlock()
  120. }
  121. // 维持心跳
  122. func heartbeat() {
  123. for {
  124. // 获取所有的Clients
  125. Manager.mu.Lock()
  126. var clients []*Client
  127. for _, c := range Manager.Clients {
  128. clients = append(clients, c)
  129. }
  130. Manager.mu.Unlock()
  131. glog.Info("开始本次心跳:")
  132. for _, c := range clients {
  133. glog.Info(c.ID)
  134. if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
  135. accountUnBind(c)
  136. }
  137. }
  138. glog.Info("结束本次心跳:")
  139. time.Sleep(time.Second * HeartbeatCheckTime)
  140. }
  141. }
  142. // 根据账号获取连接
  143. func GetClient(uuid string) *Client {
  144. Manager.mu.Lock()
  145. if c, ok := Manager.Clients[uuid]; ok {
  146. Manager.mu.Unlock()
  147. return c
  148. }
  149. Manager.mu.Unlock()
  150. return nil
  151. }
  152. // 读取信息,即收到消息 TODO 服务端读取客户的返回消息,并维持心跳
  153. func (c *Client) Read() {
  154. defer func() {
  155. fmt.Println("历史数据关闭")
  156. _ = c.Socket.Close()
  157. }()
  158. for {
  159. // 读取消息
  160. _, body, err := c.Socket.ReadMessage()
  161. if err != nil {
  162. break
  163. }
  164. var msg struct {
  165. Type string `json:"type"`
  166. }
  167. err = json.Unmarshal(body, &msg)
  168. if err != nil {
  169. log.Println(err)
  170. continue
  171. }
  172. if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
  173. // 刷新连接时间
  174. c.HeartbeatTime = time.Now().Unix()
  175. // 回复心跳
  176. replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
  177. err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
  178. if err != nil {
  179. log.Println(err)
  180. }
  181. continue
  182. }
  183. }
  184. }
  185. // 发送消息 TODO 服务端向客户端发送消息
  186. func Send(uuids []string, message ServiceMessage) error {
  187. msg, err := json.Marshal(message)
  188. if err != nil {
  189. return err
  190. }
  191. for _, uuid := range uuids {
  192. // 获取连接id
  193. client := GetClient(uuid)
  194. if client != nil {
  195. _ = client.Socket.WriteMessage(websocket.TextMessage, msg)
  196. }
  197. }
  198. return nil
  199. }
  200. func MessageNotify(uuid string, data model.SysMessage) {
  201. _, ok := Manager.Clients[uuid]
  202. if !ok { // 无匹配数据,直接报错
  203. glog.Error(fmt.Sprintf("uuid:%v 无匹配连接", uuid))
  204. return
  205. }
  206. // 发送消息
  207. go func() {
  208. var msg ServiceMessage
  209. msg.Type = "SendMessage"
  210. msg.Content = ServiceMessageContent{
  211. Body: data,
  212. }
  213. err := Send([]string{uuid}, msg)
  214. if err != nil {
  215. log.Println(err)
  216. }
  217. }()
  218. }
  219. func CreateConnection(uuid string, r *ghttp.Request) {
  220. // 将http升级为websocket
  221. conn, err := r.WebSocket()
  222. if err != nil {
  223. log.Println(err)
  224. http.NotFound(r.Response.Writer, r.Request)
  225. return
  226. }
  227. // 创建一个实例连接
  228. client := &Client{
  229. ID: uuid, // 连接id
  230. AccountId: uuid,
  231. HeartbeatTime: time.Now().Unix(),
  232. Socket: conn,
  233. }
  234. // 用户注册到用户连接管理
  235. RegisterChan <- client
  236. // 发起读取心跳消息
  237. go func() {
  238. defer func() {
  239. if r := recover(); r != nil {
  240. log.Printf("MessageNotify read panic: %+v\n", r)
  241. }
  242. }()
  243. client.Read()
  244. }()
  245. }