| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- package service
- import (
- "dashoo.cn/micro/app/model"
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/container/gset"
- "github.com/gogf/gf/frame/g"
- "github.com/gogf/gf/net/ghttp"
- "github.com/gorilla/websocket"
- "net/http"
- "sync"
- "time"
- )
- // 管理连接
- func CheckRun() {
- // 检查心跳
- go func() {
- defer func() {
- if r := recover(); r != nil {
- g.Log("websocket").Println(r)
- }
- }()
- heartbeat()
- }()
- // 注册注销
- go func() {
- defer func() {
- if r := recover(); r != nil {
- g.Log("websocket").Println(r)
- }
- }()
- register()
- }()
- }
- // 客户端连接信息
- type Client struct {
- ID string // 连接ID
- AccountId string // 账号id, 一个账号可能有多个连接
- Socket *ghttp.WebSocket // 连接
- HeartbeatTime int64 // 前一次心跳时间
- }
- // 消息类型
- const (
- MessageTypeHeartbeat = "heartbeat" // 心跳
- MessageTypeRegister = "register" // 注册
- HeartbeatCheckTime = 9 // 心跳检测几秒检测一次
- HeartbeatTime = 20 // 心跳距离上一次的最大时间
- ChanBufferRegister = 100 // 注册chan缓冲
- ChanBufferUnregister = 100 // 注销chan大小
- )
- // 客户端管理
- type ClientManager struct {
- Clients map[string]*Client // 保存连接
- Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接 // 本系统无多连接情况
- mu *sync.Mutex
- }
- // 定义一个管理Manager
- var Manager = ClientManager{
- Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
- Accounts: make(map[string][]string), // 账号和连接关系 // 本系统无多连接情况
- mu: new(sync.Mutex),
- }
- var (
- RegisterChan = make(chan *Client, ChanBufferRegister) // 注册
- unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
- )
- // 封装回复消息
- type ServiceMessage struct {
- Type string `json:"type"` // 类型
- Content ServiceMessageContent `json:"content"`
- }
- // Content
- type ServiceMessageContent struct {
- Body interface{} `json:"body"` // 主要数据
- MetaData interface{} `json:"meta_data"` // 扩展数据
- }
- // 创建消息
- func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
- replyMsg := ServiceMessage{
- Type: t,
- Content: content,
- }
- msg, _ := json.Marshal(replyMsg)
- return msg
- }
- // 注册注销
- func register() {
- for {
- select {
- case conn := <-RegisterChan: // 新注册,新连接
- // 加入连接,进行管理
- accountBind(conn)
- // 回复消息
- content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{})
- _ = conn.Socket.WriteMessage(websocket.TextMessage, content)
- case conn := <-unregisterChan: // 注销,或者没有心跳
- // 关闭连接
- _ = conn.Socket.Close()
- // 删除Client
- accountUnBind(conn)
- }
- }
- }
- // 绑定账号
- func accountBind(c *Client) {
- Manager.mu.Lock()
- // 如果不存在链接,加入到连接;若已存在,则替换
- Manager.Clients[c.ID] = c
- if !gset.NewStrSetFrom(Manager.Accounts[c.AccountId]).Contains(c.ID) {
- Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId], c.ID)
- }
- Manager.mu.Unlock()
- }
- // 解绑账号
- func accountUnBind(c *Client) {
- Manager.mu.Lock()
- // 取消连接
- if err := c.Socket.Close(); err != nil {
- g.Log("websocket").Error(err)
- }
- delete(Manager.Clients, c.ID)
- accountSet := gset.NewStrSetFrom(Manager.Accounts[c.AccountId])
- if accountSet.Contains(c.ID) {
- accountSet.Remove(c.ID)
- Manager.Accounts[c.AccountId] = accountSet.Slice()
- }
- Manager.mu.Unlock()
- }
- // 维持心跳
- func heartbeat() {
- for {
- // 获取所有的Clients
- Manager.mu.Lock()
- var clients []*Client
- for _, c := range Manager.Clients {
- clients = append(clients, c)
- }
- Manager.mu.Unlock()
- g.Log("websocket").Info("开始本次心跳:")
- for _, c := range clients {
- g.Log("websocket").Info(c.ID)
- if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
- accountUnBind(c)
- }
- }
- g.Log("websocket").Info("结束本次心跳:")
- time.Sleep(time.Second * HeartbeatCheckTime)
- }
- }
- // 根据账号获取连接
- func GetClient(userId string) *Client {
- Manager.mu.Lock()
- if c, ok := Manager.Clients[userId]; ok {
- Manager.mu.Unlock()
- return c
- }
- Manager.mu.Unlock()
- return nil
- }
- func GetAccountClient(userId string) []string {
- Manager.mu.Lock()
- if c, ok := Manager.Accounts[userId]; ok {
- Manager.mu.Unlock()
- return c
- }
- Manager.mu.Unlock()
- return nil
- }
- // 读取信息,即收到消息 TODO 服务端读取客户的返回消息,并维持心跳
- func (c *Client) Read() {
- defer func() {
- g.Log("websocket").Info("历史数据关闭")
- _ = c.Socket.Close()
- }()
- for {
- // 读取消息
- _, body, err := c.Socket.ReadMessage()
- if err != nil {
- break
- }
- var msg struct {
- Type string `json:"type"`
- }
- err = json.Unmarshal(body, &msg)
- if err != nil {
- g.Log("websocket").Error(err)
- continue
- }
- if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
- // 刷新连接时间
- c.HeartbeatTime = time.Now().Unix()
- // 回复心跳
- replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
- err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
- if err != nil {
- g.Log("websocket").Error(err)
- }
- continue
- }
- }
- }
- // 发送消息 TODO 服务端向客户端发送消息
- func Send(userIds []string, message ServiceMessage) error {
- msg, err := json.Marshal(message)
- if err != nil {
- return err
- }
- for _, userId := range userIds {
- links := GetAccountClient(userId)
- for _, link := range links {
- // 获取连接id
- client := GetClient(link)
- if client != nil {
- _ = client.Socket.WriteMessage(websocket.TextMessage, msg)
- }
- }
- }
- return nil
- }
- func MessageNotify(userId string, data model.SysMessage) {
- arr, ok := Manager.Accounts[userId]
- if !ok || len(arr) == 0 { // 无匹配数据,直接报错
- g.Log("websocket").Error(fmt.Sprintf("用户ID:%v 无匹配连接", userId))
- return
- }
- // 发送消息
- go func() {
- var msg ServiceMessage
- msg.Type = "SendMessage"
- msg.Content = ServiceMessageContent{
- Body: data,
- }
- err := Send([]string{userId}, msg)
- if err != nil {
- g.Log("websocket").Error(err)
- }
- }()
- }
- func CreateConnection(accountId, link string, r *ghttp.Request) {
- // 将http升级为websocket
- conn, err := r.WebSocket()
- if err != nil {
- g.Log("websocket").Error(err)
- http.NotFound(r.Response.Writer, r.Request)
- return
- }
- // 创建一个实例连接
- client := &Client{
- ID: link, // 连接id
- AccountId: accountId,
- HeartbeatTime: time.Now().Unix(),
- Socket: conn,
- }
- // 用户注册到用户连接管理
- RegisterChan <- client
- // 发起读取心跳消息
- go func() {
- defer func() {
- if r := recover(); r != nil {
- g.Log("websocket").Printf("MessageNotify read panic: %+v\n", r)
- }
- }()
- client.Read()
- }()
- }
|