package main import ( "fmt" "github.com/astaxie/beego" "io/ioutil" "log" "os" "os/exec" "os/signal" "runtime" "strconv" "strings" "time" "github.com/gorilla/websocket" "github.com/robfig/cron" "dashoo.cn/genepoint_srv/business/converseService" "dashoo.cn/genepoint_srv/controllers/converse" "dashoo.cn/utils" ) //const gpServiceUrl = "ws://39.105.83.226:22225/socket" //const gpServiceUrl = "ws://39.105.83.226:22368/socket" //const gpServiceUrl = "ws://fdgfghfg.oicp.net:48681/socket" //const gpServiceUrl = "ws://fdgfghfg.oicp.net:23248/socket" //var gpServiceUrl = "ws://270100d53m.wicp.vip:46538/socket" //var gpServiceUrl = "ws://39.105.83.226:22225/socket" // 基点接口服务器地址 var gpServiceUrl string var temp bool var timer *time.Timer var err error // 任务处理控制层 var ctrl converse.ConverseController // 连接断开标识 var disconnect = make(chan bool) // 是否执行任务标识 var cronstart = false var timeOut = 40 * time.Second //全局websocket连接 var wsConn *websocket.Conn // 避免重复连接标识 var lockReconnect = false // 连接超时时长 var heartTimeOut = 10 * time.Second // 初始心跳时间 var heartTime = time.Now() // 初始心跳计时器 var pingTimer = time.NewTicker(10 * time.Second) func main() { temp = false // 加载配置文件中服务器地址 utils.LoadConfig("conf/app.conf") // 初始化基点接口服务器 initGpServiceUrl() // Initialize data. utils.InitDb() //定期清理已完成任务信息 go ClearSuccessfulTask() // 备份数据库 定时任务 //go BackupDatabase() // 监听收到的信号 interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) var done = make(chan struct{}) defer close(done) //建立websocket连接,连接基点接口 // var conn *websocket.Conn if connectWebSocket() { // 查询任务,发送请求给基点接口 go ListenToSendMsg() } for { select { case <-done: return // case t := <-ticker.C: // err := c.WriteMessage(websocket.TextMessage, []byte(t.String())) // if err != nil { // log.Println("write:", err) // return // } case <-interrupt: log.Println("interrupt") // Cleanly close the connection by sending a close message and then // waiting (with timeout) for the server to close the connection. err := wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { log.Println("write close:", err) //return } select { case <-done: case <-time.After(time.Second * 10): } //return case <-disconnect: log.Println("服务器连接已断开!") pingTimer = time.NewTicker(10 * time.Second) connectWebSocket() // for { // select { // case <-time.After(time.Second * 10): // wsConn = initWebSocket() // //开启计时器 // //initTimer() // if wsConn != nil { // log.Println("正在重连-------") // defer wsConn.Close() // ctrl.GetConnection(wsConn) // //ctrl.SendMessage(nil,conn) // //go ListenToSendMsg(conn) // //go ListenToReadMsg(conn) // goto label // } // log.Println("连接失败, 等待重新连接........") // } // } } // label: } log.Println("===============Service_down===============") //conn.Close() } //重置计时器 // func resetTimer() { // log.Println("重置计时器") // timer.Reset(timeOut) // } // 开启计时器 // func initTimer() { // log.Println("开启计时器") // timer = time.NewTimer(timeOut) // go func() { // <-timer.C // //conn.Close() // temp = true // go SetDisconnect() // }() // timer.Stop() // } // 连接基点接口服务 func connectWebSocket() bool { log.Printf("connecting to %s", gpServiceUrl) // 判断是否重复连接 if lockReconnect{ return false } lockReconnect = true //尝试连接计时器 connectTimer := time.NewTicker(5 * time.Second) stopChan := make(chan bool) defer connectTimer.Stop() for { select { case <-stopChan: fmt.Println("Stop connect try") return true case <-connectTimer.C: go func() { log.Println("try conn to GP Server") conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil) if err != nil { log.Println(err) } else { stopChan <- true close(stopChan) wsConn = conn /*// 设置心跳处理事件 wsConn.SetPingHandler(func(message string) error { log.Println("ping received", message) err := wsConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second)) if err == websocket.ErrCloseSent { return nil } else if e, ok := err.(net.Error); ok && e.Temporary() { return nil } // 更新心跳时间 heartTime = time.Now() return err })*/ log.Println("WebSocket连接状态: ", wsConn != nil) // defer wsConn.Close() // 建立基点接口连接 ctrl.GetConnection(wsConn) // 建立链接即 开始发送心跳请求 echo() //重连后更新心跳时间,避免因心跳超时再重连 heartTime = time.Now() lockReconnect = false // 监听基点接口返回的消息 go ListenToReadMsg() return } }() } } return true } // 检查是否需要重新连接基点服务端 func checkNeedConnect() bool { log.Println("check", heartTime, " ", time.Now().Sub(heartTime) > heartTimeOut) return time.Now().Sub(heartTime) > heartTimeOut } // 监听基点接口返回消息 func ListenToReadMsg() (pid int) { log.Println("监听消息>>>>>>>>>>>>>>>>>>>") pid = Goid() for { if checkNeedConnect() { connectWebSocket() //break return } n, message, err := wsConn.ReadMessage() if err != nil { log.Println("read err:", err) disconnect <- true temp = true return } log.Printf("recv: %s", message) // log.Println(n) ctrl.HandleMessage(n, message) } return } // 定时查询接口任务,给基点接口发送请求 func ListenToSendMsg() { log.Println("发送消息>>>>>>>>>>>>>>>>>>>") cron := cron.New() // 定时规则:每隔10秒执行一次 tick := "*/10 * * * * ?" err = cron.AddFunc(tick, func() { // if temp { // wsConn = initWebSocket() // if wsConn != nil { // log.Println("正在重连-------") // defer wsConn.Close() // ctrl.GetConnection(wsConn) // } // } //判断conn是否还在 todo // svc := converseService.GetConverseService(utils.DBE) // 查询任务,生成指令 reqMap := ctrl.SearchDatabase() if err != nil { log.Println("read err:", err) // 任务发送失败,,,修改任务状态,,,,提示任务执行失败 for taskid, reqEntity := range reqMap { err = ctrl.StatusModify(converseService.FAILED, taskid,reqEntity.Request) svc.SaveErrInfo(err.Error(), taskid,reqEntity.Request) } } // 发送消息 for taskid, reqEntity := range reqMap { log.Println("-reqMap-taskid:", taskid) if checkNeedConnect() { connectWebSocket() break } err1 := wsConn.WriteJSON(reqEntity) if err1 != nil { log.Println("请求发送失败",err1) // 任务发送失败,,,修改任务状态,,,,提示任务执行失败 err1 = ctrl.StatusModify(converseService.FAILED, taskid,reqEntity.Request) // 跟新异常信息 totalMsg := "请求发送失败" svc.SaveErrInfo(totalMsg, taskid,reqEntity.Request) } else { log.Println("请求发送成功") // 发送成功,修改状态 err1 = ctrl.StatusModify(converseService.SENT, taskid,reqEntity.Request) } } }) if err != nil { log.Println(err) } if !cronstart { cronstart = true cron.Start() } if temp { // todo 这部分不知是否需要修改 svc := converseService.GetConverseService(utils.DBE) sql := " update sample_storage_task set StatusCode =4 where StatusCode =1" svc.DBE.Exec(sql) } } // 定期清理任务表已完成信息 func ClearSuccessfulTask() { log.Println("清理任务定时任务启动>>>>>>>>>>>>>>>>>>>") cron := cron.New() //每天0时清除任务表 tick := "0 0 0 * * ?" err = cron.AddFunc(tick, func() { ctrl.DeleteAccomplishedTask() if err != nil { log.Println("read:", err) } }) cron.Start() } func SetDisconnect() { log.Println("设置连接断开状态") disconnect <- true } func Goid() int { defer func() { if err := recover(); err != nil { fmt.Println("panic recover:panic info:%v", err) } }() var buf [64]byte n := runtime.Stack(buf[:], false) idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0] id, err := strconv.Atoi(idField) if err != nil { panic(fmt.Sprintf("cannot get goroutine id: %v", err)) } return id } // 初始化基点接口服务器 func initGpServiceUrl() { gpServiceUrl = utils.Cfg.MustValue("server", "gpServiceUrl") } // 初始化连接后 开始发送心跳 func echo() { // 设置心跳处理 wsConn.SetPongHandler( func(message string) error { log.Println("pong received", message) // 更新心跳时间 heartTime = time.Now() return err }) go func() { // 设定ping for { select { case <-pingTimer.C: go func() { log.Println("ping msg") err := wsConn.WriteControl(websocket.PingMessage, []byte("Do ping"), time.Now().Add(5 * time.Second)) if err != nil { log.Println(err) } }() } } }() /*for { _, _, err := wsConn.ReadMessage() if err != nil { log.Println("read:", err) break } log.Printf("recv: %s", message) err = wsConn.WriteMessage(mt,[]byte("Do ping")) if err != nil { log.Println("write:", err) break } }*/ } // 定期清理任务表已完成信息 func BackupDatabase() { log.Println("定时备份数据库启动>>>>>>>>>>>>>>>>>>>") cron := cron.New() //每天0时备份数据库 // * * * * * command //minute hour day month week command //tick := "0 0 0 * * ?" tick := "0 0 0 * * ?" err = cron.AddFunc(tick, func() { BackupMySqlDb("") if err != nil { log.Println("数据库备份失败:", err) } }) cron.Start() } /** * * 备份MySql数据库 * @param host: 数据库地址: * @param port: 端口: * @param user: 用户名: * @param password: 密码: * @param databaseName: 需要被分的数据库名: biobank_genepoint_new * @param tableName: 需要备份的表名: * @param sqlPath: 备份SQL存储路径: /home/workservices/bank/ * @return backupPath * */ func BackupMySqlDb(tableName string) (error,string) { //定义Cmd结构体对象指针 log.Println("开始备份数据库>>>>>>>>>>>>>>>>>>>") var cmd *exec.Cmd host := beego.AppConfig.String("db::host") port := beego.AppConfig.String("db::db_port") user := beego.AppConfig.String("db::user") password := beego.AppConfig.String("db::pwd") databaseName := beego.AppConfig.String("db::name") sqlPath := "/home/workservices/bank/" //sqlPath := "D:/backup/" //在这里如果没有传输表名,那么将会备份整个数据库,否则将只备份自己传入的表 //if tableName == "" { cmd = exec.Command("mysqldump", "--column-statistics=0","-h"+host, "-P"+port, "-u"+user, "-p"+password, databaseName) /*} else { cmd = exec.Command("mysqldump", "-h"+host, "-P"+port, "-u"+user, "-p"+password, databaseName, tableName) }*/ // mysqldump --column-statistics=0 -h127.0.0.1 -P3307 -uroot -pDashoo#190801@ali biobank_genepoint_new > /home/workservices/bank/biobank_genepoint_new_20210413.sql //StdinPipe方法返回一个在命令Start后与命令标准输入关联的管道。 stdout, err := cmd.StdoutPipe() if err != nil { log.Println(err) return err,"" } if err := cmd.Start(); err != nil { log.Println(err) return err,"" } bytes, err := ioutil.ReadAll(stdout) if err != nil { log.Println(err) return err,"" } //获得一个当前的时间戳 now := time.Now().Format("20060102") var backupPath string //设置我们备份文件的名字 if tableName == "" { backupPath = sqlPath+databaseName+"_"+now+".sql" } else { backupPath = sqlPath+databaseName+"_"+tableName+"_"+now+".sql" } log.Println("定时备份数据库完成>>>>>>>>>>>>>>>>>>>",backupPath) //写入文件并设置文件权限 err = ioutil.WriteFile(backupPath, bytes, 0644) if err != nil { log.Println(err) return err,"" } return nil,backupPath }