| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- package main
- import (
- "fmt"
- "github.com/astaxie/beego"
- "io/ioutil"
- "log"
- "os"
- "os/exec"
- "os/signal"
- "runtime"
- "strconv"
- "strings"
- "time"
- "github.com/gorilla/websocket"
- "github.com/robfig/cron"
- "dashoo.cn/genepoint_srv/business/converseService"
- "dashoo.cn/genepoint_srv/controllers/converse"
- "dashoo.cn/utils"
- )
- //const gpServiceUrl = "ws://39.105.83.226:22225/socket"
- //const gpServiceUrl = "ws://39.105.83.226:22368/socket"
- //const gpServiceUrl = "ws://fdgfghfg.oicp.net:48681/socket"
- //const gpServiceUrl = "ws://fdgfghfg.oicp.net:23248/socket"
- //var gpServiceUrl = "ws://270100d53m.wicp.vip:46538/socket"
- //var gpServiceUrl = "ws://39.105.83.226:22225/socket"
- // 基点接口服务器地址
- var gpServiceUrl string
- var temp bool
- var timer *time.Timer
- var err error
- // 任务处理控制层
- var ctrl converse.ConverseController
- // 连接断开标识
- var disconnect = make(chan bool)
- // 是否执行任务标识
- var cronstart = false
- var timeOut = 40 * time.Second
- //全局websocket连接
- var wsConn *websocket.Conn
- // 避免重复连接标识
- var lockReconnect = false
- // 连接超时时长
- var heartTimeOut = 10 * time.Second
- // 初始心跳时间
- var heartTime = time.Now()
- // 初始心跳计时器
- var pingTimer = time.NewTicker(10 * time.Second)
- func main() {
- temp = false
- // 加载配置文件中服务器地址
- utils.LoadConfig("conf/app.conf")
- // 初始化基点接口服务器
- initGpServiceUrl()
- // Initialize data.
- utils.InitDb()
- //定期清理已完成任务信息
- go ClearSuccessfulTask()
- // 备份数据库 定时任务
- //go BackupDatabase()
- // 监听收到的信号
- interrupt := make(chan os.Signal, 1)
- signal.Notify(interrupt, os.Interrupt)
- var done = make(chan struct{})
- defer close(done)
- //建立websocket连接,连接基点接口
- // var conn *websocket.Conn
- if connectWebSocket() {
- // 查询任务,发送请求给基点接口
- go ListenToSendMsg()
- }
- for {
- select {
- case <-done:
- return
- // case t := <-ticker.C:
- // err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
- // if err != nil {
- // log.Println("write:", err)
- // return
- // }
- case <-interrupt:
- log.Println("interrupt")
- // Cleanly close the connection by sending a close message and then
- // waiting (with timeout) for the server to close the connection.
- err := wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
- if err != nil {
- log.Println("write close:", err)
- //return
- }
- select {
- case <-done:
- case <-time.After(time.Second * 10):
- }
- //return
- case <-disconnect:
- log.Println("服务器连接已断开!")
- pingTimer = time.NewTicker(10 * time.Second)
- connectWebSocket()
- // for {
- // select {
- // case <-time.After(time.Second * 10):
- // wsConn = initWebSocket()
- // //开启计时器
- // //initTimer()
- // if wsConn != nil {
- // log.Println("正在重连-------")
- // defer wsConn.Close()
- // ctrl.GetConnection(wsConn)
- // //ctrl.SendMessage(nil,conn)
- // //go ListenToSendMsg(conn)
- // //go ListenToReadMsg(conn)
- // goto label
- // }
- // log.Println("连接失败, 等待重新连接........")
- // }
- // }
- }
- // label:
- }
- log.Println("===============Service_down===============")
- //conn.Close()
- }
- //重置计时器
- // func resetTimer() {
- // log.Println("重置计时器")
- // timer.Reset(timeOut)
- // }
- // 开启计时器
- // func initTimer() {
- // log.Println("开启计时器")
- // timer = time.NewTimer(timeOut)
- // go func() {
- // <-timer.C
- // //conn.Close()
- // temp = true
- // go SetDisconnect()
- // }()
- // timer.Stop()
- // }
- // 连接基点接口服务
- func connectWebSocket() bool {
- log.Printf("connecting to %s", gpServiceUrl)
- // 判断是否重复连接
- if lockReconnect{
- return false
- }
- lockReconnect = true
- //尝试连接计时器
- connectTimer := time.NewTicker(5 * time.Second)
- stopChan := make(chan bool)
- defer connectTimer.Stop()
- for {
- select {
- case <-stopChan:
- fmt.Println("Stop connect try")
- return true
- case <-connectTimer.C:
- go func() {
- log.Println("try conn to GP Server")
- conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
- if err != nil {
- log.Println(err)
- } else {
- stopChan <- true
- close(stopChan)
- wsConn = conn
- /*// 设置心跳处理事件
- wsConn.SetPingHandler(func(message string) error {
- log.Println("ping received", message)
- err := wsConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
- if err == websocket.ErrCloseSent {
- return nil
- } else if e, ok := err.(net.Error); ok && e.Temporary() {
- return nil
- }
- // 更新心跳时间
- heartTime = time.Now()
- return err
- })*/
- log.Println("WebSocket连接状态: ", wsConn != nil)
- // defer wsConn.Close()
- // 建立基点接口连接
- ctrl.GetConnection(wsConn)
- // 建立链接即 开始发送心跳请求
- echo()
- //重连后更新心跳时间,避免因心跳超时再重连
- heartTime = time.Now()
- lockReconnect = false
- // 监听基点接口返回的消息
- go ListenToReadMsg()
- return
- }
- }()
- }
- }
- return true
- }
- // 检查是否需要重新连接基点服务端
- func checkNeedConnect() bool {
- log.Println("check", heartTime, " ", time.Now().Sub(heartTime) > heartTimeOut)
- return time.Now().Sub(heartTime) > heartTimeOut
- }
- // 监听基点接口返回消息
- func ListenToReadMsg() (pid int) {
- log.Println("监听消息>>>>>>>>>>>>>>>>>>>")
- pid = Goid()
- for {
- if checkNeedConnect() {
- connectWebSocket()
- //break
- return
- }
- n, message, err := wsConn.ReadMessage()
- if err != nil {
- log.Println("read err:", err)
- disconnect <- true
- temp = true
- return
- }
- log.Printf("recv: %s", message)
- // log.Println(n)
- ctrl.HandleMessage(n, message)
- }
- return
- }
- // 定时查询接口任务,给基点接口发送请求
- func ListenToSendMsg() {
- log.Println("发送消息>>>>>>>>>>>>>>>>>>>")
- cron := cron.New()
- // 定时规则:每隔10秒执行一次
- tick := "*/10 * * * * ?"
- err = cron.AddFunc(tick, func() {
- // if temp {
- // wsConn = initWebSocket()
- // if wsConn != nil {
- // log.Println("正在重连-------")
- // defer wsConn.Close()
- // ctrl.GetConnection(wsConn)
- // }
- // }
- //判断conn是否还在 todo
- //
- svc := converseService.GetConverseService(utils.DBE)
- // 查询任务,生成指令
- reqMap := ctrl.SearchDatabase()
- if err != nil {
- log.Println("read err:", err)
- // 任务发送失败,,,修改任务状态,,,,提示任务执行失败
- for taskid, reqEntity := range reqMap {
- err = ctrl.StatusModify(converseService.FAILED, taskid,reqEntity.Request)
- svc.SaveErrInfo(err.Error(), taskid,reqEntity.Request)
- }
- }
- // 发送消息
- for taskid, reqEntity := range reqMap {
- log.Println("-reqMap-taskid:", taskid)
- if checkNeedConnect() {
- connectWebSocket()
- break
- }
- err1 := wsConn.WriteJSON(reqEntity)
- if err1 != nil {
- log.Println("请求发送失败",err1)
- // 任务发送失败,,,修改任务状态,,,,提示任务执行失败
- err1 = ctrl.StatusModify(converseService.FAILED, taskid,reqEntity.Request)
- // 跟新异常信息
- totalMsg := "请求发送失败"
- svc.SaveErrInfo(totalMsg, taskid,reqEntity.Request)
- } else {
- log.Println("请求发送成功")
- // 发送成功,修改状态
- err1 = ctrl.StatusModify(converseService.SENT, taskid,reqEntity.Request)
- }
- }
- })
- if err != nil {
- log.Println(err)
- }
- if !cronstart {
- cronstart = true
- cron.Start()
- }
- if temp { // todo 这部分不知是否需要修改
- svc := converseService.GetConverseService(utils.DBE)
- sql := " update sample_storage_task set StatusCode =4 where StatusCode =1"
- svc.DBE.Exec(sql)
- }
- }
- // 定期清理任务表已完成信息
- func ClearSuccessfulTask() {
- log.Println("清理任务定时任务启动>>>>>>>>>>>>>>>>>>>")
- cron := cron.New()
- //每天0时清除任务表
- tick := "0 0 0 * * ?"
- err = cron.AddFunc(tick, func() {
- ctrl.DeleteAccomplishedTask()
- if err != nil {
- log.Println("read:", err)
- }
- })
- cron.Start()
- }
- func SetDisconnect() {
- log.Println("设置连接断开状态")
- disconnect <- true
- }
- func Goid() int {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println("panic recover:panic info:%v", err)
- }
- }()
- var buf [64]byte
- n := runtime.Stack(buf[:], false)
- idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
- id, err := strconv.Atoi(idField)
- if err != nil {
- panic(fmt.Sprintf("cannot get goroutine id: %v", err))
- }
- return id
- }
- // 初始化基点接口服务器
- func initGpServiceUrl() {
- gpServiceUrl = utils.Cfg.MustValue("server", "gpServiceUrl")
- }
- // 初始化连接后 开始发送心跳
- func echo() {
- // 设置心跳处理
- wsConn.SetPongHandler(
- func(message string) error {
- log.Println("pong received", message)
- // 更新心跳时间
- heartTime = time.Now()
- return err
- })
- go func() {
- // 设定ping
- for {
- select {
- case <-pingTimer.C:
- go func() {
- log.Println("ping msg")
- err := wsConn.WriteControl(websocket.PingMessage, []byte("Do ping"), time.Now().Add(5 * time.Second))
- if err != nil {
- log.Println(err)
- }
- }()
- }
- }
- }()
- /*for {
- _, _, err := wsConn.ReadMessage()
- if err != nil {
- log.Println("read:", err)
- break
- }
- log.Printf("recv: %s", message)
- err = wsConn.WriteMessage(mt,[]byte("Do ping"))
- if err != nil {
- log.Println("write:", err)
- break
- }
- }*/
- }
- // 定期清理任务表已完成信息
- func BackupDatabase() {
- log.Println("定时备份数据库启动>>>>>>>>>>>>>>>>>>>")
- cron := cron.New()
- //每天0时备份数据库
- // * * * * * command
- //minute hour day month week command
- //tick := "0 0 0 * * ?"
- tick := "0 0 0 * * ?"
- err = cron.AddFunc(tick, func() {
- BackupMySqlDb("")
- if err != nil {
- log.Println("数据库备份失败:", err)
- }
- })
- cron.Start()
- }
- /**
- *
- * 备份MySql数据库
- * @param host: 数据库地址:
- * @param port: 端口:
- * @param user: 用户名:
- * @param password: 密码:
- * @param databaseName: 需要被分的数据库名: biobank_genepoint_new
- * @param tableName: 需要备份的表名:
- * @param sqlPath: 备份SQL存储路径: /home/workservices/bank/
- * @return backupPath
- *
- */
- func BackupMySqlDb(tableName string) (error,string) {
- //定义Cmd结构体对象指针
- log.Println("开始备份数据库>>>>>>>>>>>>>>>>>>>")
- var cmd *exec.Cmd
- host := beego.AppConfig.String("db::host")
- port := beego.AppConfig.String("db::db_port")
- user := beego.AppConfig.String("db::user")
- password := beego.AppConfig.String("db::pwd")
- databaseName := beego.AppConfig.String("db::name")
- sqlPath := "/home/workservices/bank/"
- //sqlPath := "D:/backup/"
- //在这里如果没有传输表名,那么将会备份整个数据库,否则将只备份自己传入的表
- //if tableName == "" {
- cmd = exec.Command("mysqldump", "--column-statistics=0","-h"+host, "-P"+port, "-u"+user, "-p"+password, databaseName)
- /*} else {
- cmd = exec.Command("mysqldump", "-h"+host, "-P"+port, "-u"+user, "-p"+password, databaseName, tableName)
- }*/
- // mysqldump --column-statistics=0 -h127.0.0.1 -P3307 -uroot -pDashoo#190801@ali biobank_genepoint_new > /home/workservices/bank/biobank_genepoint_new_20210413.sql
- //StdinPipe方法返回一个在命令Start后与命令标准输入关联的管道。
- stdout, err := cmd.StdoutPipe()
- if err != nil {
- log.Println(err)
- return err,""
- }
- if err := cmd.Start(); err != nil {
- log.Println(err)
- return err,""
- }
- bytes, err := ioutil.ReadAll(stdout)
- if err != nil {
- log.Println(err)
- return err,""
- }
- //获得一个当前的时间戳
- now := time.Now().Format("20060102")
- var backupPath string
- //设置我们备份文件的名字
- if tableName == "" {
- backupPath = sqlPath+databaseName+"_"+now+".sql"
- } else {
- backupPath = sqlPath+databaseName+"_"+tableName+"_"+now+".sql"
- }
- log.Println("定时备份数据库完成>>>>>>>>>>>>>>>>>>>",backupPath)
- //写入文件并设置文件权限
- err = ioutil.WriteFile(backupPath, bytes, 0644)
- if err != nil {
- log.Println(err)
- return err,""
- }
- return nil,backupPath
- }
|