main.go 7.0 KB


  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "strconv"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "dashoo.cn/smartdevice_mqtt/business/trigger"
  16. "github.com/go-xorm/xorm"
  17. "dashoo.cn/utils"
  18. "reflect"
  19. "unsafe"
  20. "dashoo.cn/labsop"
  21. "dashoo.cn/utils/redis"
  22. "dashoo.cn/smartdevice_mqtt/service"
  23. "github.com/eclipse/paho.mqtt.golang"
  24. "github.com/nsqio/go-nsq"
  25. )
  26. var (
  27. producer *nsq.Producer
  28. // respChan chan *nsq.ProducerTransaction
  29. )
  30. var cv service.ChannelValue
  31. //发送返回参数
  32. type ReturnSMS struct {
  33. Code int `json:"code"`
  34. Message string `json:"message"`
  35. }
  36. type TopicMessage struct {
  37. Topic string `json:"topic"`
  38. Payload string `json:"payload"`
  39. }
  40. // 收到订阅的mqtt消息处理
  41. func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
  42. // fmt.Printf("TOPIC: %s\n", msg.Topic())
  43. // fmt.Printf("MSG: % X\n", msg.Payload())
  44. SeedToControl(msg)
  45. // datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
  46. // producer.Publish(service.Config.Send.Controller.Topic, msg.Payload())
  47. }
  48. func SeedToControl(msg mqtt.Message) {
  49. var tms TopicMessage
  50. fmt.Printf("TOPIC: %s\n", msg.Topic())
  51. fmt.Printf("MSG: % X\n", msg.Payload())
  52. code := strings.Split(msg.Topic(), "/")
  53. tms.Topic = code[2]
  54. if len(msg.Payload()) == 1 {
  55. v2, _ := strconv.ParseFloat(BytesToString(msg.Payload()), 64)
  56. cv.Doorlock = v2
  57. cv.Time = time.Now()
  58. WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload(), utils.DBE)
  59. }
  60. // tms.Payload = fmt.Sprintf("%X", msg.Payload())
  61. // ActionUrl := service.Config.Send.Controller.ActionUrl
  62. // body := ApiPost(ActionUrl, "POST", tms)
  63. // fmt.Println("", body.Code, body.Message)
  64. return
  65. }
  66. //网络信号 CSQ
  67. func CSQ(command []byte) float64 {
  68. return float64(command[0])
  69. }
  70. func BytesToString(b []byte) string {
  71. bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
  72. sh := reflect.StringHeader{bh.Data, bh.Len}
  73. return *(*string)(unsafe.Pointer(&sh))
  74. }
  75. //写入数据库 redis
  76. func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte, dbe *xorm.Engine) {
  77. svc := trigger.GetTriggerService(dbe)
  78. list := svc.GetHistoryByChannelCode(tagmac, service.Config.Send.Delay)
  79. for i := 0; i < len(list); i++ {
  80. svc.UpdateEntitybyId(list[i].Id, cv.Doorlock, list[i].CreateOn.Unix())
  81. }
  82. cachevalue, _ := labsop.GetBoxLastdata(tagmac)
  83. datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, 0, cv.Temperature, 0, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
  84. cjutc := cv.Time.Unix()
  85. if errv1 == nil && cjutc >= cachevalue.Time.Unix() {
  86. //写入成功后才写入缓存
  87. fmt.Println("设备号", tagmac)
  88. labsop.UpdateBoxLastdata(tagmac, datapoint)
  89. }
  90. }
  91. func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) {
  92. httpClient := &http.Client{
  93. //Transport:nil,
  94. //CheckRedirect: nil,
  95. Timeout: 6 * time.Second,
  96. }
  97. var httpReq *http.Request
  98. b, _ := json.Marshal(postDict)
  99. postBytesReader := bytes.NewReader(b)
  100. httpReq, _ = http.NewRequest(method, strUrl, postBytesReader)
  101. httpReq.Header.Add("Content-Type", "application/json")
  102. response, _ := httpClient.Do(httpReq)
  103. if response != nil {
  104. body, _ := ioutil.ReadAll(response.Body)
  105. json.Unmarshal(body, &listPtr)
  106. }
  107. return
  108. }
  109. func main() {
  110. service.Config = service.ReadAppConfig("conf/app.conf")
  111. //初始化缓存区
  112. redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr)
  113. utils.InitDb(service.Config.Send.Db.Type, service.Config.Send.Db.Addr, service.Config.Send.Db.Db, service.Config.Send.Db.User, service.Config.Send.Db.Password)
  114. // Set up channel on which to send signal notifications.
  115. termChan := make(chan os.Signal, 1)
  116. signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
  117. nsqCfg := nsq.NewConfig()
  118. var err error
  119. service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg)
  120. if err != nil {
  121. log.Fatalf("failed creating producer %s", err)
  122. }
  123. opts := mqtt.NewClientOptions()
  124. opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr)
  125. opts.SetClientID(service.Config.Receive.Mqtt.Clientid)
  126. opts.SetUsername(service.Config.Receive.Mqtt.Username)
  127. opts.SetPassword(service.Config.Receive.Mqtt.Password)
  128. opts.SetDefaultPublishHandler(mqttMessageHandler)
  129. opts.SetKeepAlive(60 * time.Second)
  130. opts.SetCleanSession(false)
  131. opts.SetPingTimeout(1 * time.Second)
  132. mqttClient := mqtt.NewClient(opts)
  133. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  134. log.Fatal(token.Error())
  135. }
  136. if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil {
  137. log.Fatal(token.Error())
  138. }
  139. fmt.Println("服务开启")
  140. // ticker := time.NewTicker(time.Second * 30)
  141. // go func() {
  142. // for _ = range ticker.C {
  143. // str_time := time.Now().Format("2006-01-02 15:04:05")
  144. // // mqttClient.Publish("sg/cmd/63000105", 0, true, "SHELL:@DO1=?")
  145. // // time.Sleep(1 * time.Second)
  146. // // mqttClient.Publish("sg/cmd/64100011", 0, true, "SHELL:@DO1=?")
  147. // // time.Sleep(1 * time.Second)
  148. // // mqttClient.Publish("sg/cmd/64100012", 0, true, "SHELL:@DO1=?")
  149. // // // mqttClient.Publish("sg/cmd/64100010", 1, true, "SHELL:@DO1=?")
  150. // strUrl := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100012&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
  151. // s := Apiget(strUrl)
  152. // time.Sleep(1 * time.Second)
  153. // strUrl1 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100013&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
  154. // s1 := Apiget(strUrl1)
  155. // time.Sleep(1 * time.Second)
  156. // strUrl2 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100014&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
  157. // s2 := Apiget(strUrl2)
  158. // fmt.Println("----发送----", str_time, string(s), string(s1), string(s2))
  159. // }
  160. // }()
  161. time.Sleep(6 * time.Second)
  162. // if token := seed/sg01.Unsubscribe("seed/sg01"); token.Wait() && token.Error() != nil {
  163. // fmt.Println(token.Error())
  164. // os.Exit(1)
  165. // }
  166. // Wait for receiving a signal.
  167. <-termChan
  168. mqttClient.Disconnect(250)
  169. }
  170. func Apiget(str string) (body []byte) {
  171. str = strings.Replace(str, " ", "%20", -1)
  172. response, _ := http.Get(str)
  173. if response != nil {
  174. defer response.Body.Close()
  175. body, _ = ioutil.ReadAll(response.Body)
  176. }
  177. //json.Unmarshal(body, &devices)
  178. return
  179. }