فهرست منبع

修改重连机制

sunmiao 4 سال پیش
والد
کامیت
b8c896ab92
2فایلهای تغییر یافته به همراه77 افزوده شده و 99 حذف شده
  1. 15 4
      src/dashoo.cn/genepoint_srv/business/test/test_test.go
  2. 62 95
      src/dashoo.cn/genepoint_srv/main.go

+ 15 - 4
src/dashoo.cn/genepoint_srv/business/test/test_test.go

@@ -28,14 +28,27 @@ const SAMPLE_STATUS_IN = 2
 const TASK_TUBE_STORING = 1
 const TASK_TUBE_RETREIVING = 2
 
-func test_Time(t *testing.T) {
+func Test_Time1(t *testing.T) {
+
+	pingTimer := time.NewTicker(5 * time.Second)
+	for {
+		select {
+		case <-pingTimer.C:
+			go func() {
+				fmt.Println("ping msg")
+			}()
+		}
+	}
+}
+
+func Test_Time2(t *testing.T) {
 
 	cron := cron.New()
 	// 定时规则:每隔10秒执行一次
 	tick := "*/2 * * * * ?"
 	err := cron.AddFunc(tick, func() {
 		fmt.Println(time.Now())
-		cron.Stop()
+		// cron.Stop()
 	})
 	if err != nil {
 		fmt.Println(err)
@@ -337,7 +350,5 @@ func test_Trim(t *testing.T) {
 		if err != nil {
 			fmt.Println(err)
 		}
-
 	}
-
 }

+ 62 - 95
src/dashoo.cn/genepoint_srv/main.go

@@ -64,16 +64,15 @@ func main() {
 
 	//建立websocket连接,连接基点接口
 	// var conn *websocket.Conn
-	if initWebSocket() {
+	if connectWebSocket() {
 		// 查询任务,发送请求给基点接口
 		go ListenToSendMsg()
 	}
 
 	for {
-
 		select {
 		case <-done:
-			//return
+			return
 
 			//		case t := <-ticker.C:
 			//			err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
@@ -98,7 +97,7 @@ func main() {
 			//return
 		case <-disconnect:
 			log.Println("服务器连接已断开!")
-			reConnect()
+			connectWebSocket()
 
 			// for {
 			// 	select {
@@ -127,11 +126,10 @@ func main() {
 }
 
 //重置计时器
-func resetTimer() {
-	log.Println("重置计时器")
-	timer.Reset(timeOut)
-
-}
+// func resetTimer() {
+// 	log.Println("重置计时器")
+// 	timer.Reset(timeOut)
+// }
 
 // 开启计时器
 // func initTimer() {
@@ -146,99 +144,67 @@ func resetTimer() {
 // 	timer.Stop()
 // }
 
-// 初始化WebSocket,连接基点接口
-func initWebSocket() bool {
+// 连接基点接口服务
+func connectWebSocket() bool {
 	log.Printf("connecting to %s", gpServiceUrl)
 
-	wsConn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
-	if err != nil {
-		//log.Fatal("dial:", err)
-		log.Println("dial err:", err)
-		time.Sleep(5 * time.Second)
-		reConnect()
-	}
-
-	// 设置心跳
-	wsConn.SetPingHandler(func(message string) error {
-		log.Println("ping received", message)
-		err := wsConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
-		if err == websocket.ErrCloseSent {
-			return nil
-		} else if e, ok := err.(net.Error); ok && e.Temporary() {
-			return nil
+	//尝试连接计时器
+	connectTimer := time.NewTicker(5 * time.Second)
+	stopChan := make(chan bool)
+	defer connectTimer.Stop()
+	for {
+		select {
+		case <-stopChan:
+			fmt.Println("Stop connect try")
+			return true
+		case <-connectTimer.C:
+			go func() {
+				log.Println("try conn to GP Server")
+				conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
+				if err != nil {
+					log.Println(err)
+				} else {
+					stopChan <- true
+					close(stopChan)
+
+					wsConn = conn
+
+					// 设置心跳处理事件
+					wsConn.SetPingHandler(func(message string) error {
+						log.Println("ping received", message)
+						err := wsConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
+						if err == websocket.ErrCloseSent {
+							return nil
+						} else if e, ok := err.(net.Error); ok && e.Temporary() {
+							return nil
+						}
+
+						// 更新心跳时间
+						heartTime = time.Now()
+						return err
+					})
+
+					log.Println("WebSocket连接状态: ", wsConn != nil)
+					// defer wsConn.Close()
+					// 建立基点接口连接
+					ctrl.GetConnection(wsConn)
+
+					//重连后更新心跳时间,避免因心跳超时再重连
+					heartTime = time.Now()
+					// 监听基点接口返回的消息
+					go ListenToReadMsg()
+					return
+				}
+			}()
 		}
-
-		// 更新心跳时间
-		heartTime = time.Now()
-		return err
-	})
-
-	log.Println("WebSocket连接状态: ", wsConn != nil)
-	defer wsConn.Close()
-	// 建立基点接口连接
-	ctrl.GetConnection(wsConn)
-	// 开启及时器
-	// initTimer()
-
-	// 监听基点接口返回的消息
-	go ListenToReadMsg()
-
-	return true
-
-	//定义连接失败异常
-	// var ErrCloseSent = errors.New("websocket: close sent")
-
-	// 监听心跳
-	// conn.SetPingHandler(func(appData string) error {
-	// 	log.Println("ping received", appData)
-	// 	err := conn.WriteControl(10, []byte(appData), time.Now().Add(time.Second))
-	// 	// log.Println("err:", err)
-	// 	if err == ErrCloseSent {
-	// 		log.Println("WebSocketFound-err-temp:", temp)
-	// 		temp = true
-	// 		go SetDisconnect()
-	// 		return nil
-	// 	} else if e, ok := err.(net.Error); ok && e.Temporary() {
-	// 		log.Println("WebSocketFound-ok-temp:", temp)
-	// 		if temp {
-	// 			go ListenToSendMsg(conn)
-	// 			go ListenToReadMsg(conn)
-	// 		}
-	// 		//disconnect <- false
-	// 		temp = false
-	// 		resetTimer()
-	// 		return nil
-	// 	} else {
-	// 		log.Println("WebSocketFound-ok-temp:", temp)
-	// 		if temp {
-	// 			go ListenToSendMsg(conn)
-	// 			go ListenToReadMsg(conn)
-	// 		}
-	// 		//disconnect <- false
-	// 		temp = false
-	// 		resetTimer()
-	// 		return nil
-
-	// 	}
-	// 	return err
-	// })
-
-	// return conn
-}
-
-// 重新连接websocket
-func reConnect() {
-	if lockReconnect { // 避免频繁重连
-		return
 	}
+	return true
 
-	lockReconnect = true
-	initWebSocket()
-	lockReconnect = false
 }
 
 // 检查是否需要重新连接基点服务端
 func checkNeedConnect() bool {
+	log.Println("check", heartTime, " ", time.Now().Sub(heartTime) > heartTimeOut)
 	return time.Now().Sub(heartTime) > heartTimeOut
 }
 
@@ -248,7 +214,7 @@ func ListenToReadMsg() (pid int) {
 	pid = Goid()
 	for {
 		if checkNeedConnect() {
-			reConnect()
+			connectWebSocket()
 			break
 		}
 		n, message, err := wsConn.ReadMessage()
@@ -260,6 +226,7 @@ func ListenToReadMsg() (pid int) {
 			return
 		}
 		log.Printf("recv: %s", message)
+		// log.Println(n)
 
 		ctrl.HandleMessage(n, message)
 	}
@@ -299,7 +266,7 @@ func ListenToSendMsg() {
 		for taskid, reqEntity := range reqMap {
 			log.Println("-reqMap-taskid:", taskid)
 			if checkNeedConnect() {
-				reConnect()
+				connectWebSocket()
 				break
 			}
 			err1 := wsConn.WriteJSON(reqEntity)