main.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package main
  2. import (
  3. "dashoo.cn/api/business/converseService"
  4. "dashoo.cn/api/controllers/converse"
  5. "dashoo.cn/utils"
  6. "fmt"
  7. "github.com/robfig/cron"
  8. "log"
  9. "os"
  10. "os/signal"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. )
  17. //const gpServiceUrl = "ws://39.105.83.226:22225/socket"
  18. //const gpServiceUrl = "ws://39.105.83.226:22368/socket"
  19. //const gpServiceUrl = "ws://fdgfghfg.oicp.net:48681/socket"
  20. //const gpServiceUrl = "ws://fdgfghfg.oicp.net:23248/socket"
  21. const gpServiceUrl = "ws://270100d53m.wicp.vip:46538/socket"
  22. //var conn *websocket.Conn
  23. var err error
  24. var ctrl converse.ConverseController
  25. var disconnect = make(chan bool)
  26. func main() {
  27. utils.LoadConfig("conf/app.conf")
  28. // Initialize data.
  29. utils.InitDb()
  30. //定期清理已完成任务信息
  31. go ClearSuccessfulTask()
  32. interrupt := make(chan os.Signal, 1)
  33. signal.Notify(interrupt, os.Interrupt)
  34. var done = make(chan struct{})
  35. defer close(done)
  36. //建立websocket连接
  37. var conn *websocket.Conn
  38. conn = WebSocketFound()
  39. log.Println("连接成功: ", conn!=nil)
  40. if conn != nil {
  41. defer conn.Close()
  42. //建立连接
  43. ctrl.GetConnection(conn)
  44. //发送请求
  45. go ListenToSendMsg(conn)
  46. //监听消息
  47. go ListenToReadMsg(conn)
  48. }else{
  49. //设置连接失败
  50. go SetDisconnect()
  51. }
  52. for {
  53. select {
  54. case <-done:
  55. //return
  56. // case t := <-ticker.C:
  57. // err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
  58. // if err != nil {
  59. // log.Println("write:", err)
  60. // return
  61. // }
  62. case <-interrupt:
  63. log.Println("interrupt")
  64. // Cleanly close the connection by sending a close message and then
  65. // waiting (with timeout) for the server to close the connection.
  66. err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  67. if err != nil {
  68. log.Println("write close:", err)
  69. //return
  70. }
  71. select {
  72. case <-done:
  73. case <-time.After(time.Second*10):
  74. }
  75. //return
  76. case <-disconnect:
  77. log.Println("服务器连接已断开!")
  78. for {
  79. select{
  80. case <-time.After(time.Second*10):
  81. conn = WebSocketFound()
  82. if conn != nil {
  83. defer conn.Close()
  84. ctrl.GetConnection(conn)
  85. go ListenToSendMsg(conn)
  86. go ListenToReadMsg(conn)
  87. goto label
  88. }
  89. log.Println("连接失败, 等待重新连接........")
  90. }
  91. }
  92. }
  93. label:
  94. }
  95. log.Println("===============Service_down===============")
  96. //conn.Close()
  97. }
  98. func WebSocketFound() *websocket.Conn{
  99. log.Printf("connecting to %s", gpServiceUrl)
  100. conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
  101. if err != nil {
  102. //log.Fatal("dial:", err)
  103. log.Println("dial err:", err)
  104. }
  105. return conn
  106. }
  107. // 监听消息
  108. func ListenToReadMsg(conn *websocket.Conn) (pid int){
  109. log.Println("监听消息>>>>>>>>>>>>>>>>>>>")
  110. pid = Goid()
  111. for {
  112. n, message, err := conn.ReadMessage()
  113. if err != nil {
  114. log.Println("read err:", err)
  115. disconnect <- true
  116. return
  117. }
  118. log.Printf("recv: %s", message)
  119. ctrl.HandleMessage(n,message)
  120. }
  121. }
  122. //发送请求
  123. func ListenToSendMsg(conn *websocket.Conn){
  124. log.Println("发送消息>>>>>>>>>>>>>>>>>>>")
  125. cron := cron.New()
  126. tick := "*/5 * * * * ?"
  127. err = cron.AddFunc(tick, func() {
  128. //如果已连接则发送任务
  129. reqMap := ctrl.SearchDatabase()
  130. if err != nil {
  131. log.Println("read err:", err)
  132. }
  133. //判断conn是否还在 todo
  134. for taskId,reqEntity := range reqMap {
  135. err := conn.WriteJSON(reqEntity)
  136. if err != nil {
  137. log.Println("write err:", err)
  138. }else{
  139. err = ctrl.StatusModify(converseService.SENT, taskId)
  140. }
  141. }
  142. })
  143. cron.Start()
  144. }
  145. //定期清理任务表已完成信息
  146. func ClearSuccessfulTask() {
  147. log.Println("清理任务定时任务启动>>>>>>>>>>>>>>>>>>>")
  148. cron := cron.New()
  149. //每天0时清除任务表
  150. tick := "0 0 0 * * ?"
  151. err = cron.AddFunc(tick, func() {
  152. ctrl.DeleteAccomplishedTask()
  153. if err != nil {
  154. log.Println("read:", err)
  155. }
  156. })
  157. cron.Start()
  158. }
  159. func SetDisconnect(){
  160. log.Println("设置连接断开状态")
  161. disconnect <- true
  162. }
  163. func Goid() int {
  164. defer func() {
  165. if err := recover(); err != nil {
  166. fmt.Println("panic recover:panic info:%v", err)
  167. }
  168. }()
  169. var buf [64]byte
  170. n := runtime.Stack(buf[:], false)
  171. idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
  172. id, err := strconv.Atoi(idField)
  173. if err != nil {
  174. panic(fmt.Sprintf("cannot get goroutine id: %v", err))
  175. }
  176. return id
  177. }