فهرست منبع

修改重连机制

sunmiao 4 سال پیش
والد
کامیت
cfff9c74ff
3فایلهای تغییر یافته به همراه407 افزوده شده و 352 حذف شده
  1. BIN
      src/dashoo.cn/genepoint_srv/api
  2. 248 241
      src/dashoo.cn/genepoint_srv/business/test/test_test.go
  3. 159 111
      src/dashoo.cn/genepoint_srv/main.go

BIN
src/dashoo.cn/genepoint_srv/api


+ 248 - 241
src/dashoo.cn/genepoint_srv/business/test/test_test.go

@@ -1,15 +1,17 @@
 package test
 
 import (
-	conter "dashoo.cn/genepoint_srv/controllers/converse"
-	converse "dashoo.cn/genepoint_srv/business/converseService"
-	"dashoo.cn/utils"
 	"encoding/json"
 	"fmt"
-	"time"
-
 	"strings"
 	"testing"
+	"time"
+
+	"github.com/robfig/cron"
+
+	converse "dashoo.cn/genepoint_srv/business/converseService"
+	conter "dashoo.cn/genepoint_srv/controllers/converse"
+	"dashoo.cn/utils"
 )
 
 const UNEXECUTED = 0
@@ -25,23 +27,42 @@ const SAMPLE_STATUS_OUT = 5
 const SAMPLE_STATUS_IN = 2
 const TASK_TUBE_STORING = 1
 const TASK_TUBE_RETREIVING = 2
-func  Test_Trim(t *testing.T) {
+
+func test_Time(t *testing.T) {
+
+	cron := cron.New()
+	// 定时规则:每隔10秒执行一次
+	tick := "*/2 * * * * ?"
+	err := cron.AddFunc(tick, func() {
+		fmt.Println(time.Now())
+		cron.Stop()
+	})
+	if err != nil {
+		fmt.Println(err)
+	}
+
+	cron.Start()
+
+	time.Sleep(time.Second * 10)
+	cron.Stop()
+}
+
+func test_Trim(t *testing.T) {
 
 	// 初始化数据库连接
 	utils.LoadConfig("../../conf/app.conf")
 	// Initialize data.
 	utils.InitDb()
 
-
 	// 定义请求报文
-//	var paymentData =`{"response":"tube_retrieving",
-//"result":200,"time":"2020-12-16T08:59:00Z",
-//"data":{"type":"accept","task_id":"559126787905298400_1608109140",
-//"taskMsg":[{"cu":10,"model":"whole_rack","total":1,
-//"list":[{"index":1,"rack":101,"rack_id":"GA00000623","tube":201,"tube_number":1}]},
-//{"cu":11,"model":"whole_rack","total":1,
-//"list":[{"index":1,"rack":101,"rack_id":"B20200803000508","tube":201,"tube_number":1}]}]}}`
-	var paymentData =` {
+	//	var paymentData =`{"response":"tube_retrieving",
+	//"result":200,"time":"2020-12-16T08:59:00Z",
+	//"data":{"type":"accept","task_id":"559126787905298400_1608109140",
+	//"taskMsg":[{"cu":10,"model":"whole_rack","total":1,
+	//"list":[{"index":1,"rack":101,"rack_id":"GA00000623","tube":201,"tube_number":1}]},
+	//{"cu":11,"model":"whole_rack","total":1,
+	//"list":[{"index":1,"rack":101,"rack_id":"B20200803000508","tube":201,"tube_number":1}]}]}}`
+	var paymentData = ` {
 "response":"tube_retrieving",
 "result":200,
 "time":"2018-09-15T13:45:32Z",
@@ -101,236 +122,222 @@ func  Test_Trim(t *testing.T) {
 ]
 }
 }`
-//var paymentData1 =` {
-//"response":"rack_storing",
-//"result":200,
-//"time":"2018-09-15T13:45:32Z",
-//"data":{
-//"type":"end",
-//"task_id":"426466061139845100_1608100330",
-//"is_end":false,
-//"execution_time":52,
-//"exceptions":[
-//{
-//"cu":2,
-//"codes":[
-//30305
-//]
-//}
-//],
-//"actual_data":[
-//{
-//"rack":101,
-//"tube":201,
-//"rack_id":"B20201215000208",
-//"target":{
-//"cu":1,
-//"ltu":1,
-//"group":1,
-//"unit":1,
-//"pos":1
-//},
-//"tubes":[
-//{
-//"no":1,
-//"id":"562431137612308481",
-//"exception":30305
-//},
-//{
-//"no":2,
-//"id":"562431137612308482",
-//"exception":30305
-//},
-//{
-//"no":4,
-//"id":"562431137612308483",
-//"exception":0
-//},
-//{
-//"no":7,
-//"id":"562431137612308484",
-//"exception":0
-//},
-//{
-//"no":5,
-//"id":"562431137612308485",
-//"exception":0
-//}
-//]
-//}
-//]
-//}
-//}`
-//var paymentData2 =` {
-//"response":"rack_storing",
-//"result":200,
-//"time":"2018-09-15T13:45:32Z",
-//"data":{
-//"type":"end",
-//"task_id":"426466061139845100_1608100330",
-//"is_end":true,
-//"execution_time":52,
-//"exceptions":[
-//{
-//"cu":3,
-//"codes":[
-//30305
-//]
-//}
-//],
-//"actual_data":[
-//{
-//"rack":101,
-//"tube":201,
-//"rack_id":"B20201215000209",
-//"target":{
-//"cu":1,
-//"ltu":1,
-//"group":1,
-//"unit":1,
-//"pos":1
-//},
-//"tubes":[
-//{
-//"no":2,
-//"id":"5624311376123084811",
-//"exception":30305
-//},
-//{
-//"no":8,
-//"id":"5624311376123084822",
-//"exception":30305
-//},
-//{
-//"no":16,
-//"id":"5624311376123084833",
-//"exception":0
-//},
-//{
-//"no":11,
-//"id":"5624311376123084844",
-//"exception":0
-//},
-//{
-//"no":12,
-//"id":"5624311376123084855",
-//"exception":0
-//}
-//]
-//}
-//]
-//}
-//}`
-
-var str  [1]string
-
-str[0] =paymentData
-//str[1] =paymentData1
-//str[2] =paymentData2
-
-
-
-
-
-
-
-
-
-for i:=range str{
-	fmt.Println("当前时间1",time.Now())
-	time.Sleep(10000)
-	fmt.Println("当前时间2",time.Now())
-
-
-	// 报文转换为本系统可读
-	var res converse.ResponseEntity
-	json.Unmarshal([]byte(str[i]), &res)
-
-	fmt.Println("res:",res)
-
-	var  message []byte
-
-	message,_ = json.Marshal(&str[i])
-	fmt.Println("paymentData;",str[i])
-	json.Unmarshal(message, &res)
-
-	fmt.Println("message:",message)
-	fmt.Println("res:",res)
-
-	// 获取请求参数
-	resp := res.Response
-	result := res.Result
-	taskId := strings.Split(res.Data.Task_id,"_")[0]
-	isEnd := res.Data.Is_end
-	taskType := res.Data.Type
-
-	fmt.Println("resp:",resp)
-	fmt.Println("result:",result)
-	fmt.Println("taskId:",taskId)
-	fmt.Println("isEnd:",isEnd)
-	fmt.Println("taskType:",taskType)
-
-
-	//调试临时用, 把设备信息打印忽略
-	//if resp != "report_data" {
-	//	log.Println("res:",&res)
+	//var paymentData1 =` {
+	//"response":"rack_storing",
+	//"result":200,
+	//"time":"2018-09-15T13:45:32Z",
+	//"data":{
+	//"type":"end",
+	//"task_id":"426466061139845100_1608100330",
+	//"is_end":false,
+	//"execution_time":52,
+	//"exceptions":[
+	//{
+	//"cu":2,
+	//"codes":[
+	//30305
+	//]
 	//}
-	//response := res.Response
-
-	//处理设备状态数据
-	/*if response == "report_data" {
-		converseService.RecordDeviceData(res.Data.List)
-	}*/
-
-	controller  := conter.ConverseController{}
-
-	var err error
-	//如果正常响应且任务执行成功则修改任务状态为成功
-	if result == 200 {
-		if taskType == "end" || taskType == "abnormal_end" {
-			fmt.Println(">>>>>>>>>>任务[" + taskId + "]结束(" + taskType + ")处理(是否全部结束:" + utils.ToStr(isEnd) + ")>>>>>>>>>>")
-
-			// 所有入库单下子任务已完成,根据样本申请状态(有一个不为成功则更新为失败状态)更新申请单,任务状态
-			if isEnd {
-				err = controller.StatusModifyWithDetail(taskId)
-			}
-			//若非正常结束,保存异常信息
-			if taskType == "abnormal_end" {
-				controller.SaveExcepMsg(res, taskId)
-			}
-			//处理任务下申请
-			svc := converse.GetConverseService(utils.DBE)
-			svc.ModifySampleStatusByApplyMainInfo(taskId, resp, res)
-			//this.SampleAllStatusModify(res,SUCCESS)
-		}   else if taskType == "task_activate" {
-			fmt.Println(">>>>>>>>>>修改激活状态>>>>>>>>>>")
-			if res.Data.Status == 3 {
-				err = controller.StatusModify(REJECT, taskId)
-			} else {
-				err = controller.StatusModify(ACTIVE, taskId)
+	//],
+	//"actual_data":[
+	//{
+	//"rack":101,
+	//"tube":201,
+	//"rack_id":"B20201215000208",
+	//"target":{
+	//"cu":1,
+	//"ltu":1,
+	//"group":1,
+	//"unit":1,
+	//"pos":1
+	//},
+	//"tubes":[
+	//{
+	//"no":1,
+	//"id":"562431137612308481",
+	//"exception":30305
+	//},
+	//{
+	//"no":2,
+	//"id":"562431137612308482",
+	//"exception":30305
+	//},
+	//{
+	//"no":4,
+	//"id":"562431137612308483",
+	//"exception":0
+	//},
+	//{
+	//"no":7,
+	//"id":"562431137612308484",
+	//"exception":0
+	//},
+	//{
+	//"no":5,
+	//"id":"562431137612308485",
+	//"exception":0
+	//}
+	//]
+	//}
+	//]
+	//}
+	//}`
+	//var paymentData2 =` {
+	//"response":"rack_storing",
+	//"result":200,
+	//"time":"2018-09-15T13:45:32Z",
+	//"data":{
+	//"type":"end",
+	//"task_id":"426466061139845100_1608100330",
+	//"is_end":true,
+	//"execution_time":52,
+	//"exceptions":[
+	//{
+	//"cu":3,
+	//"codes":[
+	//30305
+	//]
+	//}
+	//],
+	//"actual_data":[
+	//{
+	//"rack":101,
+	//"tube":201,
+	//"rack_id":"B20201215000209",
+	//"target":{
+	//"cu":1,
+	//"ltu":1,
+	//"group":1,
+	//"unit":1,
+	//"pos":1
+	//},
+	//"tubes":[
+	//{
+	//"no":2,
+	//"id":"5624311376123084811",
+	//"exception":30305
+	//},
+	//{
+	//"no":8,
+	//"id":"5624311376123084822",
+	//"exception":30305
+	//},
+	//{
+	//"no":16,
+	//"id":"5624311376123084833",
+	//"exception":0
+	//},
+	//{
+	//"no":11,
+	//"id":"5624311376123084844",
+	//"exception":0
+	//},
+	//{
+	//"no":12,
+	//"id":"5624311376123084855",
+	//"exception":0
+	//}
+	//]
+	//}
+	//]
+	//}
+	//}`
+
+	var str [1]string
+
+	str[0] = paymentData
+	//str[1] =paymentData1
+	//str[2] =paymentData2
+
+	for i := range str {
+		fmt.Println("当前时间1", time.Now())
+		time.Sleep(10000)
+		fmt.Println("当前时间2", time.Now())
+
+		// 报文转换为本系统可读
+		var res converse.ResponseEntity
+		json.Unmarshal([]byte(str[i]), &res)
+
+		fmt.Println("res:", res)
+
+		var message []byte
+
+		message, _ = json.Marshal(&str[i])
+		fmt.Println("paymentData;", str[i])
+		json.Unmarshal(message, &res)
+
+		fmt.Println("message:", message)
+		fmt.Println("res:", res)
+
+		// 获取请求参数
+		resp := res.Response
+		result := res.Result
+		taskId := strings.Split(res.Data.Task_id, "_")[0]
+		isEnd := res.Data.Is_end
+		taskType := res.Data.Type
+
+		fmt.Println("resp:", resp)
+		fmt.Println("result:", result)
+		fmt.Println("taskId:", taskId)
+		fmt.Println("isEnd:", isEnd)
+		fmt.Println("taskType:", taskType)
+
+		//调试临时用, 把设备信息打印忽略
+		//if resp != "report_data" {
+		//	log.Println("res:",&res)
+		//}
+		//response := res.Response
+
+		//处理设备状态数据
+		/*if response == "report_data" {
+			converseService.RecordDeviceData(res.Data.List)
+		}*/
+
+		controller := conter.ConverseController{}
+
+		var err error
+		//如果正常响应且任务执行成功则修改任务状态为成功
+		if result == 200 {
+			if taskType == "end" || taskType == "abnormal_end" {
+				fmt.Println(">>>>>>>>>>任务[" + taskId + "]结束(" + taskType + ")处理(是否全部结束:" + utils.ToStr(isEnd) + ")>>>>>>>>>>")
+
+				// 所有入库单下子任务已完成,根据样本申请状态(有一个不为成功则更新为失败状态)更新申请单,任务状态
+				if isEnd {
+					err = controller.StatusModifyWithDetail(taskId)
+				}
+				//若非正常结束,保存异常信息
+				if taskType == "abnormal_end" {
+					controller.SaveExcepMsg(res, taskId)
+				}
+				//处理任务下申请
+				svc := converse.GetConverseService(utils.DBE)
+				svc.ModifySampleStatusByApplyMainInfo(taskId, resp, res)
+				//this.SampleAllStatusModify(res,SUCCESS)
+			} else if taskType == "task_activate" {
+				fmt.Println(">>>>>>>>>>修改激活状态>>>>>>>>>>")
+				if res.Data.Status == 3 {
+					err = controller.StatusModify(REJECT, taskId)
+				} else {
+					err = controller.StatusModify(ACTIVE, taskId)
+				}
+			} else if taskType == "accept" {
+				fmt.Println(">>>>>>>>>>修改接受状态>>>>>>>>>>")
+				err = controller.StatusModify(ACCEPT, taskId)
+			} else if taskType == "running" {
+				fmt.Println(">>>>>>>>>>修改执行中状态>>>>>>>>>>")
+				err = controller.StatusModify(RUNNING, taskId)
 			}
-		} else if taskType == "accept" {
-			fmt.Println(">>>>>>>>>>修改接受状态>>>>>>>>>>")
-			err = controller.StatusModify(ACCEPT, taskId)
-		} else if taskType == "running" {
-			fmt.Println(">>>>>>>>>>修改执行中状态>>>>>>>>>>")
-			err = controller.StatusModify(RUNNING, taskId)
+		} else if result == 300 && taskType == "reject" {
+			fmt.Println("----------任务[" + taskId + "]任务拒绝----------")
+			//保存异常信息
+			controller.SaveExcepMsg(res, taskId)
+			err = controller.StatusModify(REJECT, taskId)
 		}
-	}  else if result == 300 && taskType == "reject" {
-		fmt.Println("----------任务[" + taskId + "]任务拒绝----------")
-		//保存异常信息
-		controller.SaveExcepMsg(res, taskId)
-		err = controller.StatusModify(REJECT, taskId)
-	}
-
-	if err != nil {
-		fmt.Println(err)
-	}
 
+		if err != nil {
+			fmt.Println(err)
+		}
 
+	}
 
 }
-
-
-
-}

+ 159 - 111
src/dashoo.cn/genepoint_srv/main.go

@@ -2,18 +2,17 @@ package main
 
 import (
 	"fmt"
-	"net"
-
-	"github.com/gorilla/websocket"
-	"github.com/robfig/cron"
 	"log"
+	"net"
 	"os"
 	"os/signal"
 	"runtime"
 	"strconv"
 	"strings"
 	"time"
-	"errors"
+
+	"github.com/gorilla/websocket"
+	"github.com/robfig/cron"
 
 	"dashoo.cn/genepoint_srv/business/converseService"
 	"dashoo.cn/genepoint_srv/controllers/converse"
@@ -30,13 +29,20 @@ import (
 // 基点接口服务器地址
 var gpServiceUrl string
 var temp bool
-var timeOut = 10 * time.Second
+
 var timer *time.Timer
+
 var err error
-var conn *websocket.Conn
 var ctrl converse.ConverseController
 var disconnect = make(chan bool)
-var cronstart=false
+var cronstart = false
+
+var timeOut = 10 * time.Second
+
+var wsConn *websocket.Conn //全局websocket连接
+var lockReconnect = false
+var heartTimeOut = 15 * time.Second
+var heartTime = time.Now()
 
 func main() {
 	temp = false
@@ -57,22 +63,10 @@ func main() {
 	defer close(done)
 
 	//建立websocket连接,连接基点接口
-	var conn *websocket.Conn
-	conn = WebSocketFound()
-	log.Println("WebSocket连接状态: ", conn != nil)
-	if conn != nil {
-		defer conn.Close()
-		// 建立基点接口连接
-		ctrl.GetConnection(conn)
-		// 开启及时器
-		initTimer()
+	// var conn *websocket.Conn
+	if initWebSocket() {
 		// 查询任务,发送请求给基点接口
-		go ListenToSendMsg(conn)
-		// 监听基点接口返回的消息
-		go ListenToReadMsg(conn)
-	} else {
-		//设置连接失败
-		go SetDisconnect()
+		go ListenToSendMsg()
 	}
 
 	for {
@@ -92,7 +86,7 @@ func main() {
 
 			// Cleanly close the connection by sending a close message and then
 			// waiting (with timeout) for the server to close the connection.
-			err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
+			err := wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
 			if err != nil {
 				log.Println("write close:", err)
 				//return
@@ -102,112 +96,162 @@ func main() {
 			case <-time.After(time.Second * 10):
 			}
 			//return
-		case  <-disconnect:
+		case <-disconnect:
 			log.Println("服务器连接已断开!")
-
-			for {
-				select {
-				case <-time.After(time.Second * 10):
-					conn = WebSocketFound()
-					//开启计时器
-					//initTimer()
-					if conn != nil {
-						log.Println("正在重连-------")
-						defer conn.Close()
-						ctrl.GetConnection(conn)
-						//ctrl.SendMessage(nil,conn)
-						//go ListenToSendMsg(conn)
-						//go ListenToReadMsg(conn)
-						goto label
-					}
-					log.Println("连接失败, 等待重新连接........")
-				}
-			}
+			reConnect()
+
+			// for {
+			// 	select {
+			// 	case <-time.After(time.Second * 10):
+			// 		wsConn = initWebSocket()
+			// 		//开启计时器
+			// 		//initTimer()
+			// 		if wsConn != nil {
+			// 			log.Println("正在重连-------")
+			// 			defer wsConn.Close()
+			// 			ctrl.GetConnection(wsConn)
+			// 			//ctrl.SendMessage(nil,conn)
+			// 			//go ListenToSendMsg(conn)
+			// 			//go ListenToReadMsg(conn)
+			// 			goto label
+			// 		}
+			// 		log.Println("连接失败, 等待重新连接........")
+			// 	}
+			// }
 		}
-	label:
+		// label:
 	}
 
 	log.Println("===============Service_down===============")
 	//conn.Close()
 }
 
-
 //重置计时器
-func resetTimer(){
+func resetTimer() {
 	log.Println("重置计时器")
 	timer.Reset(timeOut)
 
 }
-// 开启计时器
-func initTimer ()  {
-	log.Println("开启计时器")
-	timer = time.NewTimer(timeOut)
-	go func() {
-		<-timer.C
-		//conn.Close()
-		temp = true
-		go SetDisconnect()
-	}()
-	timer.Stop()
-}
 
+// 开启计时器
+// func initTimer() {
+// 	log.Println("开启计时器")
+// 	timer = time.NewTimer(timeOut)
+// 	go func() {
+// 		<-timer.C
+// 		//conn.Close()
+// 		temp = true
+// 		go SetDisconnect()
+// 	}()
+// 	timer.Stop()
+// }
 
 // 初始化WebSocket,连接基点接口
-func WebSocketFound() *websocket.Conn {
+func initWebSocket() bool {
 	log.Printf("connecting to %s", gpServiceUrl)
-	conn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
+
+	wsConn, _, err := websocket.DefaultDialer.Dial(gpServiceUrl, nil)
 	if err != nil {
 		//log.Fatal("dial:", err)
 		log.Println("dial err:", err)
+		time.Sleep(5 * time.Second)
+		reConnect()
 	}
 
-	//定义连接失败异常
-	var ErrCloseSent = errors.New("websocket: close sent")
-
-	// 监听心跳
-	conn.SetPingHandler(   func(appData string) error {
-		log.Println("ping received", appData)
-		err := conn.WriteControl(10, []byte(appData), time.Now().Add(time.Second))
-		log.Println("err:",err)
-		if err == ErrCloseSent {
-			log.Println("WebSocketFound-err-temp:",temp)
-			temp = true
-			go SetDisconnect()
+	// 设置心跳
+	wsConn.SetPingHandler(func(message string) error {
+		log.Println("ping received", message)
+		err := wsConn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
+		if err == websocket.ErrCloseSent {
 			return nil
 		} else if e, ok := err.(net.Error); ok && e.Temporary() {
-			log.Println("WebSocketFound-ok-temp:",temp)
-			if temp{
-				go ListenToSendMsg(conn)
-				go ListenToReadMsg(conn)
-			}
-			//disconnect <- false
-			temp = false
-			resetTimer()
 			return nil
-		} else {
-			log.Println("WebSocketFound-ok-temp:",temp)
-			if temp{
-				go ListenToSendMsg(conn)
-				go ListenToReadMsg(conn)
-			}
-			//disconnect <- false
-			temp = false
-			resetTimer()
-			return nil
-
 		}
+
+		// 更新心跳时间
+		heartTime = time.Now()
 		return err
 	})
 
-	return conn
+	log.Println("WebSocket连接状态: ", wsConn != nil)
+	defer wsConn.Close()
+	// 建立基点接口连接
+	ctrl.GetConnection(wsConn)
+	// 开启及时器
+	// initTimer()
+
+	// 监听基点接口返回的消息
+	go ListenToReadMsg()
+
+	return true
+
+	//定义连接失败异常
+	// var ErrCloseSent = errors.New("websocket: close sent")
+
+	// 监听心跳
+	// conn.SetPingHandler(func(appData string) error {
+	// 	log.Println("ping received", appData)
+	// 	err := conn.WriteControl(10, []byte(appData), time.Now().Add(time.Second))
+	// 	// log.Println("err:", err)
+	// 	if err == ErrCloseSent {
+	// 		log.Println("WebSocketFound-err-temp:", temp)
+	// 		temp = true
+	// 		go SetDisconnect()
+	// 		return nil
+	// 	} else if e, ok := err.(net.Error); ok && e.Temporary() {
+	// 		log.Println("WebSocketFound-ok-temp:", temp)
+	// 		if temp {
+	// 			go ListenToSendMsg(conn)
+	// 			go ListenToReadMsg(conn)
+	// 		}
+	// 		//disconnect <- false
+	// 		temp = false
+	// 		resetTimer()
+	// 		return nil
+	// 	} else {
+	// 		log.Println("WebSocketFound-ok-temp:", temp)
+	// 		if temp {
+	// 			go ListenToSendMsg(conn)
+	// 			go ListenToReadMsg(conn)
+	// 		}
+	// 		//disconnect <- false
+	// 		temp = false
+	// 		resetTimer()
+	// 		return nil
+
+	// 	}
+	// 	return err
+	// })
+
+	// return conn
+}
+
+// 重新连接websocket
+func reConnect() {
+	if lockReconnect { // 避免频繁重连
+		return
+	}
+
+	lockReconnect = true
+	initWebSocket()
+	lockReconnect = false
+}
+
+// 检查是否需要重新连接基点服务端
+func checkNeedConnect() bool {
+	return time.Now().Sub(heartTime) > heartTimeOut
 }
 
 // 监听基点接口返回消息
-func ListenToReadMsg(conn *websocket.Conn) (pid int) {
+func ListenToReadMsg() (pid int) {
 	log.Println("监听消息>>>>>>>>>>>>>>>>>>>")
 	pid = Goid()
 	for {
-		n, message, err := conn.ReadMessage()
+		if checkNeedConnect() {
+			reConnect()
+			break
+		}
+		n, message, err := wsConn.ReadMessage()
 
 		if err != nil {
 			log.Println("read err:", err)
@@ -218,28 +262,25 @@ func ListenToReadMsg(conn *websocket.Conn) (pid int) {
 		log.Printf("recv: %s", message)
 
 		ctrl.HandleMessage(n, message)
-
 	}
+	return
 }
 
-
-
 // 定时查询接口任务,给基点接口发送请求
-func ListenToSendMsg(conn *websocket.Conn) {
-
+func ListenToSendMsg() {
 	log.Println("发送消息>>>>>>>>>>>>>>>>>>>")
 	cron := cron.New()
 	// 定时规则:每隔10秒执行一次
 	tick := "*/10 * * * * ?"
 	err = cron.AddFunc(tick, func() {
-		if(temp) {
-			conn = WebSocketFound()
-			if conn != nil {
-				log.Println("正在重连-------")
-				defer conn.Close()
-				ctrl.GetConnection(conn)
-			}
-		}
+		// if temp {
+		// 	wsConn = initWebSocket()
+		// 	if wsConn != nil {
+		// 		log.Println("正在重连-------")
+		// 		defer wsConn.Close()
+		// 		ctrl.GetConnection(wsConn)
+		// 	}
+		// }
 		//判断conn是否还在  todo
 		//
 		svc := converseService.GetConverseService(utils.DBE)
@@ -257,7 +298,11 @@ func ListenToSendMsg(conn *websocket.Conn) {
 		// 发送消息
 		for taskid, reqEntity := range reqMap {
 			log.Println("-reqMap-taskid:", taskid)
-			err1 := conn.WriteJSON(reqEntity)
+			if checkNeedConnect() {
+				reConnect()
+				break
+			}
+			err1 := wsConn.WriteJSON(reqEntity)
 			log.Println("write err----:", err1)
 			if err1 != nil {
 				log.Println("请求发送失败")
@@ -273,13 +318,16 @@ func ListenToSendMsg(conn *websocket.Conn) {
 			}
 		}
 	})
+	if err != nil {
+		log.Println(err)
+	}
 	if !cronstart {
 		cronstart = true
 		cron.Start()
 	}
-	if temp {
+	if temp { // todo 这部分不知是否需要修改
 		svc := converseService.GetConverseService(utils.DBE)
-		sql:=" update sample_storage_task set StatusCode =4 where StatusCode =1"
+		sql := " update sample_storage_task set StatusCode =4 where StatusCode =1"
 		svc.DBE.Exec(sql)
 	}
 }