main.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "os"
  11. "os/signal"
  12. "strconv"
  13. "strings"
  14. "syscall"
  15. "time"
  16. "reflect"
  17. "unsafe"
  18. "dashoo.cn/labsop"
  19. "dashoo.cn/utils/redis"
  20. "dashoo.cn/sg_m3_4g_mqtt/service"
  21. "github.com/eclipse/paho.mqtt.golang"
  22. "github.com/nsqio/go-nsq"
  23. )
  24. var (
  25. producer *nsq.Producer
  26. // respChan chan *nsq.ProducerTransaction
  27. )
  28. var cv service.ChannelValue
  29. //发送返回参数
  30. type ReturnSMS struct {
  31. Code int `json:"code"`
  32. Message string `json:"message"`
  33. }
  34. type TopicMessage struct {
  35. Topic string `json:"topic"`
  36. Payload string `json:"payload"`
  37. }
  38. // 收到订阅的mqtt消息处理
  39. func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
  40. // fmt.Printf("TOPIC: %s\n", msg.Topic())
  41. // fmt.Printf("MSG: % X\n", msg.Payload())
  42. SeedToControl(msg)
  43. // datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
  44. // producer.Publish(service.Config.Send.Controller.Topic, msg.Payload())
  45. }
  46. func SeedToControl(msg mqtt.Message) {
  47. var tms TopicMessage
  48. fmt.Printf("TOPIC: %s\n", msg.Topic())
  49. fmt.Printf("MSG: % X\n", msg.Payload())
  50. code := strings.Split(msg.Topic(), "/")
  51. tms.Topic = code[2]
  52. fmt.Println("传感器:", code[1])
  53. if code[1] == "o2" && len(msg.Payload()) == 7 {
  54. ac := msg.Payload()
  55. O2 := Co2ppmint(ac[3:5])
  56. cv.O2 = O2 / 100
  57. cv.SimSignal = 24
  58. fmt.Println("cv.氧气浓度:", cv.O2)
  59. cv.Time = time.Now()
  60. WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload())
  61. }
  62. if code[1] == "co2" && len(msg.Payload()) == 7 {
  63. ac := msg.Payload()
  64. CO2 := Co2ppmint(ac[3:5])
  65. cv.Co2 = CO2 / 10000
  66. cv.SimSignal = 24
  67. fmt.Println("cv.二氧化碳浓度:", cv.Co2)
  68. cv.Time = time.Now()
  69. WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload())
  70. }
  71. return
  72. }
  73. //二氧化碳 氧气
  74. func Co2ppmint(command []byte) float64 {
  75. if len(command) == 2 {
  76. var x int16
  77. b_buf := bytes.NewBuffer(command)
  78. binary.Read(b_buf, binary.BigEndian, &x)
  79. c := float64(x) * 10
  80. c_str := strconv.FormatFloat(c, 'f', 2, 64)
  81. c_f64, _ := strconv.ParseFloat(c_str, 64)
  82. return c_f64
  83. }
  84. return 0
  85. }
  86. //网络信号 CSQ
  87. func CSQ(command []byte) float64 {
  88. return float64(command[0])
  89. }
  90. func BytesToString(b []byte) string {
  91. bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
  92. sh := reflect.StringHeader{bh.Data, bh.Len}
  93. return *(*string)(unsafe.Pointer(&sh))
  94. }
  95. //写入数据库 redis
  96. func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte) {
  97. cachevalue, _ := labsop.GetBoxLastdata(tagmac)
  98. datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, cv.O2, cv.Temperature, cv.Co2, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
  99. cjutc := cv.Time.Unix()
  100. if errv1 == nil && cjutc >= cachevalue.Time.Unix() {
  101. //写入成功后才写入缓存
  102. fmt.Println("----写入----", tagmac)
  103. labsop.UpdateBoxLastdata(tagmac, datapoint)
  104. }
  105. }
  106. func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) {
  107. httpClient := &http.Client{
  108. //Transport:nil,
  109. //CheckRedirect: nil,
  110. Timeout: 6 * time.Second,
  111. }
  112. var httpReq *http.Request
  113. b, _ := json.Marshal(postDict)
  114. postBytesReader := bytes.NewReader(b)
  115. httpReq, _ = http.NewRequest(method, strUrl, postBytesReader)
  116. httpReq.Header.Add("Content-Type", "application/json")
  117. response, _ := httpClient.Do(httpReq)
  118. if response != nil {
  119. body, _ := ioutil.ReadAll(response.Body)
  120. json.Unmarshal(body, &listPtr)
  121. }
  122. return
  123. }
  124. func main() {
  125. service.Config = service.ReadAppConfig("conf/app.conf")
  126. //初始化缓存区
  127. redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr)
  128. // Set up channel on which to send signal notifications.
  129. termChan := make(chan os.Signal, 1)
  130. signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
  131. nsqCfg := nsq.NewConfig()
  132. var err error
  133. service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg)
  134. if err != nil {
  135. log.Fatalf("failed creating producer %s", err)
  136. }
  137. opts := mqtt.NewClientOptions()
  138. opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr)
  139. opts.SetClientID(service.Config.Receive.Mqtt.Clientid)
  140. opts.SetUsername(service.Config.Receive.Mqtt.Username)
  141. opts.SetPassword(service.Config.Receive.Mqtt.Password)
  142. opts.SetDefaultPublishHandler(mqttMessageHandler)
  143. opts.SetKeepAlive(60 * time.Second)
  144. opts.SetCleanSession(false)
  145. opts.SetPingTimeout(1 * time.Second)
  146. mqttClient := mqtt.NewClient(opts)
  147. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  148. log.Fatal(token.Error())
  149. }
  150. if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil {
  151. log.Fatal(token.Error())
  152. }
  153. fmt.Println("服务开启")
  154. time.Sleep(6 * time.Second)
  155. // Wait for receiving a signal.
  156. <-termChan
  157. mqttClient.Disconnect(250)
  158. }