package service import ( "dashoo.cn/micro/app/model" "encoding/json" "fmt" "github.com/gogf/gf/container/gset" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/net/ghttp" "github.com/gorilla/websocket" "net/http" "sync" "time" ) // 管理连接 func CheckRun() { // 检查心跳 go func() { defer func() { if r := recover(); r != nil { g.Log("websocket").Println(r) } }() heartbeat() }() // 注册注销 go func() { defer func() { if r := recover(); r != nil { g.Log("websocket").Println(r) } }() register() }() } // 客户端连接信息 type Client struct { ID string // 连接ID AccountId string // 账号id, 一个账号可能有多个连接 Socket *ghttp.WebSocket // 连接 HeartbeatTime int64 // 前一次心跳时间 } // 消息类型 const ( MessageTypeHeartbeat = "heartbeat" // 心跳 MessageTypeRegister = "register" // 注册 HeartbeatCheckTime = 9 // 心跳检测几秒检测一次 HeartbeatTime = 20 // 心跳距离上一次的最大时间 ChanBufferRegister = 100 // 注册chan缓冲 ChanBufferUnregister = 100 // 注销chan大小 ) // 客户端管理 type ClientManager struct { Clients map[string]*Client // 保存连接 Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接 // 本系统无多连接情况 mu *sync.Mutex } // 定义一个管理Manager var Manager = ClientManager{ Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数 Accounts: make(map[string][]string), // 账号和连接关系 // 本系统无多连接情况 mu: new(sync.Mutex), } var ( RegisterChan = make(chan *Client, ChanBufferRegister) // 注册 unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销 ) // 封装回复消息 type ServiceMessage struct { Type string `json:"type"` // 类型 Content ServiceMessageContent `json:"content"` } // Content type ServiceMessageContent struct { Body interface{} `json:"body"` // 主要数据 MetaData interface{} `json:"meta_data"` // 扩展数据 } // 创建消息 func CreateReplyMsg(t string, content ServiceMessageContent) []byte { replyMsg := ServiceMessage{ Type: t, Content: content, } msg, _ := json.Marshal(replyMsg) return msg } // 注册注销 func register() { for { select { case conn := <-RegisterChan: // 新注册,新连接 // 加入连接,进行管理 accountBind(conn) // 回复消息 content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{}) _ = conn.Socket.WriteMessage(websocket.TextMessage, content) case conn := <-unregisterChan: // 注销,或者没有心跳 // 关闭连接 _ = conn.Socket.Close() // 删除Client accountUnBind(conn) } } } // 绑定账号 func accountBind(c *Client) { Manager.mu.Lock() // 如果不存在链接,加入到连接;若已存在,则替换 Manager.Clients[c.ID] = c if !gset.NewStrSetFrom(Manager.Accounts[c.AccountId]).Contains(c.ID) { Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId], c.ID) } Manager.mu.Unlock() } // 解绑账号 func accountUnBind(c *Client) { Manager.mu.Lock() // 取消连接 if err := c.Socket.Close(); err != nil { g.Log("websocket").Error(err) } delete(Manager.Clients, c.ID) accountSet := gset.NewStrSetFrom(Manager.Accounts[c.AccountId]) if accountSet.Contains(c.ID) { accountSet.Remove(c.ID) Manager.Accounts[c.AccountId] = accountSet.Slice() } Manager.mu.Unlock() } // 维持心跳 func heartbeat() { for { // 获取所有的Clients Manager.mu.Lock() var clients []*Client for _, c := range Manager.Clients { clients = append(clients, c) } Manager.mu.Unlock() g.Log("websocket").Info("开始本次心跳:") for _, c := range clients { g.Log("websocket").Info(c.ID) if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime { accountUnBind(c) } } g.Log("websocket").Info("结束本次心跳:") time.Sleep(time.Second * HeartbeatCheckTime) } } // 根据账号获取连接 func GetClient(userId string) *Client { Manager.mu.Lock() if c, ok := Manager.Clients[userId]; ok { Manager.mu.Unlock() return c } Manager.mu.Unlock() return nil } func GetAccountClient(userId string) []string { Manager.mu.Lock() if c, ok := Manager.Accounts[userId]; ok { Manager.mu.Unlock() return c } Manager.mu.Unlock() return nil } // 读取信息,即收到消息 TODO 服务端读取客户的返回消息,并维持心跳 func (c *Client) Read() { defer func() { g.Log("websocket").Info("历史数据关闭") _ = c.Socket.Close() }() for { // 读取消息 _, body, err := c.Socket.ReadMessage() if err != nil { break } var msg struct { Type string `json:"type"` } err = json.Unmarshal(body, &msg) if err != nil { g.Log("websocket").Error(err) continue } if msg.Type == MessageTypeHeartbeat { // 维持心跳消息 // 刷新连接时间 c.HeartbeatTime = time.Now().Unix() // 回复心跳 replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{}) err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg) if err != nil { g.Log("websocket").Error(err) } continue } } } // 发送消息 TODO 服务端向客户端发送消息 func Send(userIds []string, message ServiceMessage) error { msg, err := json.Marshal(message) if err != nil { return err } for _, userId := range userIds { links := GetAccountClient(userId) for _, link := range links { // 获取连接id client := GetClient(link) if client != nil { _ = client.Socket.WriteMessage(websocket.TextMessage, msg) } } } return nil } func MessageNotify(userId string, data model.SysMessage) { arr, ok := Manager.Accounts[userId] if !ok || len(arr) == 0 { // 无匹配数据,直接报错 g.Log("websocket").Error(fmt.Sprintf("用户ID:%v 无匹配连接", userId)) return } // 发送消息 go func() { var msg ServiceMessage msg.Type = "SendMessage" msg.Content = ServiceMessageContent{ Body: data, } err := Send([]string{userId}, msg) if err != nil { g.Log("websocket").Error(err) } }() } func CreateConnection(accountId, link string, r *ghttp.Request) { // 将http升级为websocket conn, err := r.WebSocket() if err != nil { g.Log("websocket").Error(err) http.NotFound(r.Response.Writer, r.Request) return } // 创建一个实例连接 client := &Client{ ID: link, // 连接id AccountId: accountId, HeartbeatTime: time.Now().Unix(), Socket: conn, } // 用户注册到用户连接管理 RegisterChan <- client // 发起读取心跳消息 go func() { defer func() { if r := recover(); r != nil { g.Log("websocket").Printf("MessageNotify read panic: %+v\n", r) } }() client.Read() }() }