main.go 12 KB


  1. package main
  2. import (
  3. "fmt"
  4. "github.com/astaxie/beego"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "os/signal"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/gorilla/websocket"
  15. "github.com/robfig/cron"
  16. "dashoo.cn/genepoint_srv/business/converseService"
  17. "dashoo.cn/genepoint_srv/controllers/converse"
  18. "dashoo.cn/utils"
  19. )
  20. //const gpServiceUrl = "ws://39.105.83.226:22225/socket"
  21. //const gpServiceUrl = "ws://39.105.83.226:22368/socket"
  22. //const gpServiceUrl = "ws://fdgfghfg.oicp.net:48681/socket"
  23. //const gpServiceUrl = "ws://fdgfghfg.oicp.net:23248/socket"
  24. //var gpServiceUrl = "ws://270100d53m.wicp.vip:46538/socket"
  25. //var gpServiceUrl = "ws://39.105.83.226:22225/socket"
  26. // 基点接口服务器地址
  27. var gpServiceUrl string
  28. var temp bool
  29. var timer *time.Timer
  30. var err error
  31. // 任务处理控制层
  32. var ctrl converse.ConverseController
  33. // 连接断开标识
  34. var disconnect = make(chan bool)
  35. // 是否执行任务标识
  36. var cronstart = false
  37. var timeOut = 40 * time.Second
  38. //全局websocket连接
  39. var wsConn *websocket.Conn
  40. // 避免重复连接标识
  41. var lockReconnect = false
  42. // 连接超时时长
  43. var heartTimeOut = 10 * time.Second
  44. // 初始心跳时间
  45. var heartTime = time.Now()
  46. // 初始心跳计时器
  47. var pingTimer = time.NewTicker(10 * time.Second)
  48. func main() {
  49. temp = false
  50. // 加载配置文件中服务器地址
  51. utils.LoadConfig("conf/app.conf")
  52. // 初始化基点接口服务器
  53. initGpServiceUrl()
  54. // Initialize data.
  55. utils.InitDb()
  56. //定期清理已完成任务信息
  57. go ClearSuccessfulTask()
  58. // 备份数据库 定时任务
  59. //go BackupDatabase()
  60. // 监听收到的信号
  61. interrupt := make(chan os.Signal, 1)
  62. signal.Notify(interrupt, os.Interrupt)
  63. var done = make(chan struct{})
  64. defer close(done)
  65. //建立websocket连接,连接基点接口
  66. // var conn *websocket.Conn
  67. if connectWebSocket() {
  68. // 查询任务,发送请求给基点接口
  69. go ListenToSendMsg()
  70. }
  71. for {
  72. select {
  73. case <-done:
  74. return
  75. // case t := <-ticker.C:
  76. // err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
  77. // if err != nil {
  78. // log.Println("write:", err)
  79. // return
  80. // }
  81. case <-interrupt:
  82. log.Println("interrupt")
  83. // Cleanly close the connection by sending a close message and then
  84. // waiting (with timeout) for the server to close the connection.
  85. err := wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  86. if err != nil {
  87. log.Println("write close:", err)
  88. //return
  89. }
  90. select {
  91. case <-done:
  92. case <-time.After(time.Second * 10):
  93. }
  94. //return
  95. case <-disconnect:
  96. log.Println("服务器连接已断开!")
  97. pingTimer = time.NewTicker(10 * time.Second)
  98. connectWebSocket()
  99. // for {
  100. // select {
  101. // case <-time.After(time.Second * 10):
  102. // wsConn = initWebSocket()
  103. // //开启计时器
  104. // //initTimer()
  105. // if wsConn != nil {
  106. // log.Println("正在重连-------")
  107. // defer wsConn.Close()
  108. // ctrl.GetConnection(wsConn)
  109. // //ctrl.SendMessage(nil,conn)
  110. // //go ListenToSendMsg(conn)
  111. // //go ListenToReadMsg(conn)
  112. // goto label
  113. // }
  114. // log.Println("连接失败, 等待重新连接........")
  115. // }
  116. // }
  117. }
  118. // label:
  119. }
  120. log.Println("===============Service_down===============")
  121. //conn.Close()
  122. }
  123. //重置计时器
  124. // func resetTimer() {
  125. // log.Println("重置计时器")
  126. // timer.Reset(timeOut)
  127. // }
  128. // 开启计时器
  129. // func initTimer() {
  130. // log.Println("开启计时器")
  131. // timer = time.NewTimer(timeOut)
  132. // go func() {
  133. // <-timer.C
  134. // //conn.Close()
  135. // temp = true
  136. // go SetDisconnect()
  137. // }()
  138. // timer.Stop()
  139. // }
  140. // 连接基点接口服务
  141. func connectWebSocket() bool {
  142. log.Printf("connecting to %s", gpServiceUrl)
  143. // 判断是否重复连接
  144. if lockReconnect{
  145. return false
  146. }
  147. lockReconnect = true
  148. //尝试连接计时器
  149. connectTimer := time.NewTicker(5 * time.Second)
  150. stopChan := make(chan bool)
  151. defer connectTimer.Stop()
  152. for {
  153. select {
  154. case <-stopChan:
  155. fmt.Println("Stop connect try")
  156. return true
  157. case <-connectTimer.C:
  158. go func() {
  159. log.Println("try conn to GP Server")
  160. conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
  161. if err != nil {
  162. log.Println(err)
  163. } else {
  164. stopChan <- true
  165. close(stopChan)
  166. wsConn = conn
  167. /*// 设置心跳处理事件
  168. wsConn.SetPingHandler(func(message string) error {
  169. log.Println("ping received", message)
  170. err := wsConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
  171. if err == websocket.ErrCloseSent {
  172. return nil
  173. } else if e, ok := err.(net.Error); ok && e.Temporary() {
  174. return nil
  175. }
  176. // 更新心跳时间
  177. heartTime = time.Now()
  178. return err
  179. })*/
  180. log.Println("WebSocket连接状态: ", wsConn != nil)
  181. // defer wsConn.Close()
  182. // 建立基点接口连接
  183. ctrl.GetConnection(wsConn)
  184. // 建立链接即 开始发送心跳请求
  185. echo()
  186. //重连后更新心跳时间,避免因心跳超时再重连
  187. heartTime = time.Now()
  188. lockReconnect = false
  189. // 监听基点接口返回的消息
  190. go ListenToReadMsg()
  191. return
  192. }
  193. }()
  194. }
  195. }
  196. return true
  197. }
  198. // 检查是否需要重新连接基点服务端
  199. func checkNeedConnect() bool {
  200. log.Println("check", heartTime, " ", time.Now().Sub(heartTime) > heartTimeOut)
  201. return time.Now().Sub(heartTime) > heartTimeOut
  202. }
  203. // 监听基点接口返回消息
  204. func ListenToReadMsg() (pid int) {
  205. log.Println("监听消息>>>>>>>>>>>>>>>>>>>")
  206. pid = Goid()
  207. for {
  208. if checkNeedConnect() {
  209. connectWebSocket()
  210. //break
  211. return
  212. }
  213. n, message, err := wsConn.ReadMessage()
  214. if err != nil {
  215. log.Println("read err:", err)
  216. disconnect <- true
  217. temp = true
  218. return
  219. }
  220. log.Printf("recv: %s", message)
  221. // log.Println(n)
  222. ctrl.HandleMessage(n, message)
  223. }
  224. return
  225. }
  226. // 定时查询接口任务,给基点接口发送请求
  227. func ListenToSendMsg() {
  228. log.Println("发送消息>>>>>>>>>>>>>>>>>>>")
  229. cron := cron.New()
  230. // 定时规则:每隔10秒执行一次
  231. tick := "*/10 * * * * ?"
  232. err = cron.AddFunc(tick, func() {
  233. // if temp {
  234. // wsConn = initWebSocket()
  235. // if wsConn != nil {
  236. // log.Println("正在重连-------")
  237. // defer wsConn.Close()
  238. // ctrl.GetConnection(wsConn)
  239. // }
  240. // }
  241. //判断conn是否还在 todo
  242. //
  243. svc := converseService.GetConverseService(utils.DBE)
  244. // 查询任务,生成指令
  245. reqMap := ctrl.SearchDatabase()
  246. if err != nil {
  247. log.Println("read err:", err)
  248. // 任务发送失败,,,修改任务状态,,,,提示任务执行失败
  249. for taskid, reqEntity := range reqMap {
  250. err = ctrl.StatusModify(converseService.FAILED, taskid,reqEntity.Request)
  251. svc.SaveErrInfo(err.Error(), taskid,reqEntity.Request)
  252. }
  253. }
  254. // 发送消息
  255. for taskid, reqEntity := range reqMap {
  256. log.Println("-reqMap-taskid:", taskid)
  257. if checkNeedConnect() {
  258. connectWebSocket()
  259. break
  260. }
  261. err1 := wsConn.WriteJSON(reqEntity)
  262. if err1 != nil {
  263. log.Println("请求发送失败",err1)
  264. // 任务发送失败,,,修改任务状态,,,,提示任务执行失败
  265. err1 = ctrl.StatusModify(converseService.FAILED, taskid,reqEntity.Request)
  266. // 跟新异常信息
  267. totalMsg := "请求发送失败"
  268. svc.SaveErrInfo(totalMsg, taskid,reqEntity.Request)
  269. } else {
  270. log.Println("请求发送成功")
  271. // 发送成功,修改状态
  272. err1 = ctrl.StatusModify(converseService.SENT, taskid,reqEntity.Request)
  273. }
  274. }
  275. })
  276. if err != nil {
  277. log.Println(err)
  278. }
  279. if !cronstart {
  280. cronstart = true
  281. cron.Start()
  282. }
  283. if temp { // todo 这部分不知是否需要修改
  284. svc := converseService.GetConverseService(utils.DBE)
  285. sql := " update sample_storage_task set StatusCode =4 where StatusCode =1"
  286. svc.DBE.Exec(sql)
  287. }
  288. }
  289. // 定期清理任务表已完成信息
  290. func ClearSuccessfulTask() {
  291. log.Println("清理任务定时任务启动>>>>>>>>>>>>>>>>>>>")
  292. cron := cron.New()
  293. //每天0时清除任务表
  294. tick := "0 0 0 * * ?"
  295. err = cron.AddFunc(tick, func() {
  296. ctrl.DeleteAccomplishedTask()
  297. if err != nil {
  298. log.Println("read:", err)
  299. }
  300. })
  301. cron.Start()
  302. }
  303. func SetDisconnect() {
  304. log.Println("设置连接断开状态")
  305. disconnect <- true
  306. }
  307. func Goid() int {
  308. defer func() {
  309. if err := recover(); err != nil {
  310. fmt.Println("panic recover:panic info:%v", err)
  311. }
  312. }()
  313. var buf [64]byte
  314. n := runtime.Stack(buf[:], false)
  315. idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
  316. id, err := strconv.Atoi(idField)
  317. if err != nil {
  318. panic(fmt.Sprintf("cannot get goroutine id: %v", err))
  319. }
  320. return id
  321. }
  322. // 初始化基点接口服务器
  323. func initGpServiceUrl() {
  324. gpServiceUrl = utils.Cfg.MustValue("server", "gpServiceUrl")
  325. }
  326. // 初始化连接后 开始发送心跳
  327. func echo() {
  328. // 设置心跳处理
  329. wsConn.SetPongHandler(
  330. func(message string) error {
  331. log.Println("pong received", message)
  332. // 更新心跳时间
  333. heartTime = time.Now()
  334. return err
  335. })
  336. go func() {
  337. // 设定ping
  338. for {
  339. select {
  340. case <-pingTimer.C:
  341. go func() {
  342. log.Println("ping msg")
  343. err := wsConn.WriteControl(websocket.PingMessage, []byte("Do ping"), time.Now().Add(5 * time.Second))
  344. if err != nil {
  345. log.Println(err)
  346. }
  347. }()
  348. }
  349. }
  350. }()
  351. /*for {
  352. _, _, err := wsConn.ReadMessage()
  353. if err != nil {
  354. log.Println("read:", err)
  355. break
  356. }
  357. log.Printf("recv: %s", message)
  358. err = wsConn.WriteMessage(mt,[]byte("Do ping"))
  359. if err != nil {
  360. log.Println("write:", err)
  361. break
  362. }
  363. }*/
  364. }
  365. // 定期清理任务表已完成信息
  366. func BackupDatabase() {
  367. log.Println("定时备份数据库启动>>>>>>>>>>>>>>>>>>>")
  368. cron := cron.New()
  369. //每天0时备份数据库
  370. // * * * * * command
  371. //minute hour day month week command
  372. //tick := "0 0 0 * * ?"
  373. tick := "0 0 0 * * ?"
  374. err = cron.AddFunc(tick, func() {
  375. BackupMySqlDb("")
  376. if err != nil {
  377. log.Println("数据库备份失败:", err)
  378. }
  379. })
  380. cron.Start()
  381. }
  382. /**
  383. *
  384. * 备份MySql数据库
  385. * @param host: 数据库地址:
  386. * @param port: 端口:
  387. * @param user: 用户名:
  388. * @param password: 密码:
  389. * @param databaseName: 需要被分的数据库名: biobank_genepoint_new
  390. * @param tableName: 需要备份的表名:
  391. * @param sqlPath: 备份SQL存储路径: /home/workservices/bank/
  392. * @return backupPath
  393. *
  394. */
  395. func BackupMySqlDb(tableName string) (error,string) {
  396. //定义Cmd结构体对象指针
  397. log.Println("开始备份数据库>>>>>>>>>>>>>>>>>>>")
  398. var cmd *exec.Cmd
  399. host := beego.AppConfig.String("db::host")
  400. port := beego.AppConfig.String("db::db_port")
  401. user := beego.AppConfig.String("db::user")
  402. password := beego.AppConfig.String("db::pwd")
  403. databaseName := beego.AppConfig.String("db::name")
  404. sqlPath := "/home/workservices/bank/"
  405. //sqlPath := "D:/backup/"
  406. //在这里如果没有传输表名,那么将会备份整个数据库,否则将只备份自己传入的表
  407. //if tableName == "" {
  408. cmd = exec.Command("mysqldump", "--column-statistics=0","-h"+host, "-P"+port, "-u"+user, "-p"+password, databaseName)
  409. /*} else {
  410. cmd = exec.Command("mysqldump", "-h"+host, "-P"+port, "-u"+user, "-p"+password, databaseName, tableName)
  411. }*/
  412. // mysqldump --column-statistics=0 -h127.0.0.1 -P3307 -uroot -pDashoo#190801@ali biobank_genepoint_new > /home/workservices/bank/biobank_genepoint_new_20210413.sql
  413. //StdinPipe方法返回一个在命令Start后与命令标准输入关联的管道。
  414. stdout, err := cmd.StdoutPipe()
  415. if err != nil {
  416. log.Println(err)
  417. return err,""
  418. }
  419. if err := cmd.Start(); err != nil {
  420. log.Println(err)
  421. return err,""
  422. }
  423. bytes, err := ioutil.ReadAll(stdout)
  424. if err != nil {
  425. log.Println(err)
  426. return err,""
  427. }
  428. //获得一个当前的时间戳
  429. now := time.Now().Format("20060102")
  430. var backupPath string
  431. //设置我们备份文件的名字
  432. if tableName == "" {
  433. backupPath = sqlPath+databaseName+"_"+now+".sql"
  434. } else {
  435. backupPath = sqlPath+databaseName+"_"+tableName+"_"+now+".sql"
  436. }
  437. log.Println("定时备份数据库完成>>>>>>>>>>>>>>>>>>>",backupPath)
  438. //写入文件并设置文件权限
  439. err = ioutil.WriteFile(backupPath, bytes, 0644)
  440. if err != nil {
  441. log.Println(err)
  442. return err,""
  443. }
  444. return nil,backupPath
  445. }