소스 검색

添加心跳监控

luchm 4 년 전
부모
커밋
1ae1167f02
1개의 변경된 파일74개의 추가작업 그리고 17개의 파일을 삭제
  1. 74 17
      src/dashoo.cn/genepoint_srv/main.go

+ 74 - 17
src/dashoo.cn/genepoint_srv/main.go

@@ -2,6 +2,10 @@ package main
 
 import (
 	"fmt"
+	"net"
+
+	"github.com/gorilla/websocket"
+	"github.com/robfig/cron"
 	"log"
 	"os"
 	"os/signal"
@@ -9,9 +13,7 @@ import (
 	"strconv"
 	"strings"
 	"time"
-
-	"github.com/gorilla/websocket"
-	"github.com/robfig/cron"
+	"errors"
 
 	"dashoo.cn/genepoint_srv/business/converseService"
 	"dashoo.cn/genepoint_srv/controllers/converse"
@@ -28,8 +30,10 @@ import (
 // 基点接口服务器地址
 var gpServiceUrl string
 var temp bool
-
+var timeOut = 10 * time.Second
+var timer *time.Timer
 var err error
+var conn *websocket.Conn
 var ctrl converse.ConverseController
 var disconnect = make(chan bool)
 
@@ -59,6 +63,8 @@ func main() {
 		defer conn.Close()
 		// 建立基点接口连接
 		ctrl.GetConnection(conn)
+		// 开启及时器
+		initTimer()
 		// 查询任务,发送请求给基点接口
 		go ListenToSendMsg(conn)
 		// 监听基点接口返回的消息
@@ -82,7 +88,7 @@ func main() {
 			//			}
 		case <-interrupt:
 			log.Println("interrupt")
-			temp = false
+
 			// Cleanly close the connection by sending a close message and then
 			// waiting (with timeout) for the server to close the connection.
 			err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
@@ -95,25 +101,25 @@ func main() {
 			case <-time.After(time.Second * 10):
 			}
 			//return
-		case temp = <-disconnect:
+		case  <-disconnect:
 			log.Println("服务器连接已断开!")
 
 			for {
 				select {
 				case <-time.After(time.Second * 10):
 					conn = WebSocketFound()
+					//开启计时器
+					initTimer()
 					if conn != nil {
 						defer conn.Close()
 						ctrl.GetConnection(conn)
-						go ListenToSendMsg(conn)
-						go ListenToReadMsg(conn)
+						//go ListenToSendMsg(conn)
+						//go ListenToReadMsg(conn)
 						goto label
 					}
 					log.Println("连接失败, 等待重新连接........")
 				}
-
 			}
-
 		}
 	label:
 	}
@@ -122,6 +128,26 @@ func main() {
 	//conn.Close()
 }
 
+
+//重置计时器
+func resetTimer(){
+	log.Println("重置计时器")
+	timer.Reset(timeOut)
+
+}
+// 开启计时器
+func initTimer ()  {
+	log.Println("开启计时器")
+	timer = time.NewTimer(timeOut)
+	go func() {
+		<-timer.C
+		//conn.Close()
+		go SetDisconnect()
+	}()
+	timer.Stop()
+}
+
+
 // 初始化WebSocket,连接基点接口
 func WebSocketFound() *websocket.Conn {
 	log.Printf("connecting to %s", gpServiceUrl)
@@ -130,6 +156,31 @@ func WebSocketFound() *websocket.Conn {
 		//log.Fatal("dial:", err)
 		log.Println("dial err:", err)
 	}
+
+	//定义连接失败异常
+	var ErrCloseSent = errors.New("websocket: close sent")
+
+	// 监听心跳
+	conn.SetPingHandler(   func(appData string) error {
+		log.Println("ping received", appData)
+		err := conn.WriteControl(10, []byte(appData), time.Now().Add(time.Second))
+		log.Println("err:",err)
+		if err == ErrCloseSent {
+			if !temp{
+				go ListenToSendMsg(conn)
+				go ListenToReadMsg(conn)
+			}
+			temp = true
+			go SetDisconnect()
+			return nil
+		} else if e, ok := err.(net.Error); ok && e.Temporary() {
+			temp = false
+			resetTimer()
+			return nil
+		}
+		return err
+	})
+
 	return conn
 }
 
@@ -138,18 +189,23 @@ func ListenToReadMsg(conn *websocket.Conn) (pid int) {
 	log.Println("监听消息>>>>>>>>>>>>>>>>>>>")
 	pid = Goid()
 	for {
-
 		n, message, err := conn.ReadMessage()
+
 		if err != nil {
 			log.Println("read err:", err)
 			disconnect <- true
+			temp = true
 			return
 		}
 		log.Printf("recv: %s", message)
+
 		ctrl.HandleMessage(n, message)
+
 	}
 }
 
+
+
 // 定时查询接口任务,给基点接口发送请求
 func ListenToSendMsg(conn *websocket.Conn) {
 	log.Println("发送消息>>>>>>>>>>>>>>>>>>>")
@@ -158,9 +214,6 @@ func ListenToSendMsg(conn *websocket.Conn) {
 	tick := "*/10 * * * * ?"
 	err = cron.AddFunc(tick, func() {
 		//判断conn是否还在  todo
-		if temp {
-			cron.Stop()
-		}
 		//
 		svc := converseService.GetConverseService(utils.DBE)
 
@@ -173,9 +226,7 @@ func ListenToSendMsg(conn *websocket.Conn) {
 				err = ctrl.StatusModify(converseService.FAILED, taskid)
 				svc.SaveErrInfo(err.Error(), taskid)
 			}
-
 		}
-
 		// 发送消息
 		for taskid, reqEntity := range reqMap {
 			log.Println("-reqMap-taskid:", taskid)
@@ -195,7 +246,13 @@ func ListenToSendMsg(conn *websocket.Conn) {
 			}
 		}
 	})
-	cron.Start()
+	if !temp {
+		cron.Start()
+	} else {
+		svc := converseService.GetConverseService(utils.DBE)
+		sql:=" update sample_storage_task set StatusCode =4 where StatusCode =1"
+		svc.DBE.Exec(sql)
+	}
 }
 
 // 定期清理任务表已完成信息