| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- package main
- import (
- "dashoo.cn/api/business/converseService"
- "dashoo.cn/api/controllers/converse"
- "dashoo.cn/utils"
- "fmt"
- "github.com/robfig/cron"
- "log"
- "os"
- "os/signal"
- "runtime"
- "strconv"
- "strings"
- "time"
- "github.com/gorilla/websocket"
- )
- //const gpServiceUrl = "ws://39.105.83.226:22225/socket"
- //const gpServiceUrl = "ws://39.105.83.226:22368/socket"
- //const gpServiceUrl = "ws://fdgfghfg.oicp.net:48681/socket"
- //const gpServiceUrl = "ws://fdgfghfg.oicp.net:23248/socket"
- const gpServiceUrl = "ws://270100d53m.wicp.vip:46538/socket"
- //var conn *websocket.Conn
- var err error
- var ctrl converse.ConverseController
- var disconnect = make(chan bool)
- func main() {
- utils.LoadConfig("conf/app.conf")
- // Initialize data.
- utils.InitDb()
- //定期清理已完成任务信息
- go ClearSuccessfulTask()
- interrupt := make(chan os.Signal, 1)
- signal.Notify(interrupt, os.Interrupt)
- var done = make(chan struct{})
- defer close(done)
- //建立websocket连接
- var conn *websocket.Conn
- conn = WebSocketFound()
- log.Println("连接成功: ", conn!=nil)
- if conn != nil {
- defer conn.Close()
- //建立连接
- ctrl.GetConnection(conn)
- //发送请求
- go ListenToSendMsg(conn)
- //监听消息
- go ListenToReadMsg(conn)
- }else{
- //设置连接失败
- go SetDisconnect()
- }
- for {
- select {
- case <-done:
- //return
- // case t := <-ticker.C:
- // err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
- // if err != nil {
- // log.Println("write:", err)
- // return
- // }
- case <-interrupt:
- log.Println("interrupt")
- // Cleanly close the connection by sending a close message and then
- // waiting (with timeout) for the server to close the connection.
- err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
- if err != nil {
- log.Println("write close:", err)
- //return
- }
- select {
- case <-done:
- case <-time.After(time.Second*10):
- }
- //return
- case <-disconnect:
- log.Println("服务器连接已断开!")
- for {
- select{
- case <-time.After(time.Second*10):
- conn = WebSocketFound()
- if conn != nil {
- defer conn.Close()
- ctrl.GetConnection(conn)
- go ListenToSendMsg(conn)
- go ListenToReadMsg(conn)
- goto label
- }
- log.Println("连接失败, 等待重新连接........")
- }
- }
- }
- label:
- }
- log.Println("===============Service_down===============")
- //conn.Close()
- }
- func WebSocketFound() *websocket.Conn{
- log.Printf("connecting to %s", gpServiceUrl)
- conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
- if err != nil {
- //log.Fatal("dial:", err)
- log.Println("dial err:", err)
- }
- return conn
- }
- // 监听消息
- func ListenToReadMsg(conn *websocket.Conn) (pid int){
- log.Println("监听消息>>>>>>>>>>>>>>>>>>>")
- pid = Goid()
- for {
- n, message, err := conn.ReadMessage()
- if err != nil {
- log.Println("read err:", err)
- disconnect <- true
- return
- }
- log.Printf("recv: %s", message)
- ctrl.HandleMessage(n,message)
- }
- }
- //发送请求
- func ListenToSendMsg(conn *websocket.Conn){
- log.Println("发送消息>>>>>>>>>>>>>>>>>>>")
- cron := cron.New()
- tick := "*/5 * * * * ?"
- err = cron.AddFunc(tick, func() {
- //如果已连接则发送任务
- reqMap := ctrl.SearchDatabase()
- if err != nil {
- log.Println("read err:", err)
- }
- //判断conn是否还在 todo
- for taskId,reqEntity := range reqMap {
- err := conn.WriteJSON(reqEntity)
- if err != nil {
- log.Println("write err:", err)
- }else{
- err = ctrl.StatusModify(converseService.SENT, taskId)
- }
- }
- })
- cron.Start()
- }
- //定期清理任务表已完成信息
- func ClearSuccessfulTask() {
- log.Println("清理任务定时任务启动>>>>>>>>>>>>>>>>>>>")
- cron := cron.New()
- //每天0时清除任务表
- tick := "0 0 0 * * ?"
- err = cron.AddFunc(tick, func() {
- ctrl.DeleteAccomplishedTask()
- if err != nil {
- log.Println("read:", err)
- }
- })
- cron.Start()
- }
- func SetDisconnect(){
- log.Println("设置连接断开状态")
- disconnect <- true
- }
- func Goid() int {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println("panic recover:panic info:%v", err)
- }
- }()
- var buf [64]byte
- n := runtime.Stack(buf[:], false)
- idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
- id, err := strconv.Atoi(idField)
- if err != nil {
- panic(fmt.Sprintf("cannot get goroutine id: %v", err))
- }
- return id
- }
|