package main import ( "dashoo.cn/api/business/converseService" "dashoo.cn/api/controllers/converse" "dashoo.cn/utils" "fmt" "github.com/robfig/cron" "log" "os" "os/signal" "runtime" "strconv" "strings" "time" "github.com/gorilla/websocket" ) //const gpServiceUrl = "ws://39.105.83.226:22225/socket" //const gpServiceUrl = "ws://39.105.83.226:22368/socket" //const gpServiceUrl = "ws://fdgfghfg.oicp.net:48681/socket" //const gpServiceUrl = "ws://fdgfghfg.oicp.net:23248/socket" const gpServiceUrl = "ws://270100d53m.wicp.vip:46538/socket" //var conn *websocket.Conn var err error var ctrl converse.ConverseController var disconnect = make(chan bool) func main() { utils.LoadConfig("conf/app.conf") // Initialize data. utils.InitDb() //定期清理已完成任务信息 go ClearSuccessfulTask() interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) var done = make(chan struct{}) defer close(done) //建立websocket连接 var conn *websocket.Conn conn = WebSocketFound() log.Println("连接成功: ", conn!=nil) if conn != nil { defer conn.Close() //建立连接 ctrl.GetConnection(conn) //发送请求 go ListenToSendMsg(conn) //监听消息 go ListenToReadMsg(conn) }else{ //设置连接失败 go SetDisconnect() } for { select { case <-done: //return // case t := <-ticker.C: // err := c.WriteMessage(websocket.TextMessage, []byte(t.String())) // if err != nil { // log.Println("write:", err) // return // } case <-interrupt: log.Println("interrupt") // Cleanly close the connection by sending a close message and then // waiting (with timeout) for the server to close the connection. err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { log.Println("write close:", err) //return } select { case <-done: case <-time.After(time.Second*10): } //return case <-disconnect: log.Println("服务器连接已断开!") for { select{ case <-time.After(time.Second*10): conn = WebSocketFound() if conn != nil { defer conn.Close() ctrl.GetConnection(conn) go ListenToSendMsg(conn) go ListenToReadMsg(conn) goto label } log.Println("连接失败, 等待重新连接........") } } } label: } log.Println("===============Service_down===============") //conn.Close() } func WebSocketFound() *websocket.Conn{ log.Printf("connecting to %s", gpServiceUrl) conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil) if err != nil { //log.Fatal("dial:", err) log.Println("dial err:", err) } return conn } // 监听消息 func ListenToReadMsg(conn *websocket.Conn) (pid int){ log.Println("监听消息>>>>>>>>>>>>>>>>>>>") pid = Goid() for { n, message, err := conn.ReadMessage() if err != nil { log.Println("read err:", err) disconnect <- true return } log.Printf("recv: %s", message) ctrl.HandleMessage(n,message) } } //发送请求 func ListenToSendMsg(conn *websocket.Conn){ log.Println("发送消息>>>>>>>>>>>>>>>>>>>") cron := cron.New() tick := "*/5 * * * * ?" err = cron.AddFunc(tick, func() { //如果已连接则发送任务 reqMap := ctrl.SearchDatabase() if err != nil { log.Println("read err:", err) } //判断conn是否还在 todo for taskId,reqEntity := range reqMap { err := conn.WriteJSON(reqEntity) if err != nil { log.Println("write err:", err) }else{ err = ctrl.StatusModify(converseService.SENT, taskId) } } }) cron.Start() } //定期清理任务表已完成信息 func ClearSuccessfulTask() { log.Println("清理任务定时任务启动>>>>>>>>>>>>>>>>>>>") cron := cron.New() //每天0时清除任务表 tick := "0 0 0 * * ?" err = cron.AddFunc(tick, func() { ctrl.DeleteAccomplishedTask() if err != nil { log.Println("read:", err) } }) cron.Start() } func SetDisconnect(){ log.Println("设置连接断开状态") disconnect <- true } func Goid() int { defer func() { if err := recover(); err != nil { fmt.Println("panic recover:panic info:%v", err) } }() var buf [64]byte n := runtime.Stack(buf[:], false) idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0] id, err := strconv.Atoi(idField) if err != nil { panic(fmt.Sprintf("cannot get goroutine id: %v", err)) } return id }