| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package main
- import (
- "bytes"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "syscall"
- "time"
- "reflect"
- "unsafe"
- "dashoo.cn/labsop"
- "dashoo.cn/utils/redis"
- "dashoo.cn/sg_m3_4g_mqtt/service"
- "github.com/eclipse/paho.mqtt.golang"
- "github.com/nsqio/go-nsq"
- )
- var (
- producer *nsq.Producer
- // respChan chan *nsq.ProducerTransaction
- )
- var cv service.ChannelValue
- //发送返回参数
- type ReturnSMS struct {
- Code int `json:"code"`
- Message string `json:"message"`
- }
- type TopicMessage struct {
- Topic string `json:"topic"`
- Payload string `json:"payload"`
- }
- // 收到订阅的mqtt消息处理
- func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
- // fmt.Printf("TOPIC: %s\n", msg.Topic())
- // fmt.Printf("MSG: % X\n", msg.Payload())
- SeedToControl(msg)
- // datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
- // producer.Publish(service.Config.Send.Controller.Topic, msg.Payload())
- }
- func SeedToControl(msg mqtt.Message) {
- var tms TopicMessage
- fmt.Printf("TOPIC: %s\n", msg.Topic())
- fmt.Printf("MSG: % X\n", msg.Payload())
- code := strings.Split(msg.Topic(), "/")
- tms.Topic = code[2]
- fmt.Println("传感器:", code[1])
- if code[1] == "o2" && len(msg.Payload()) == 7 {
- ac := msg.Payload()
- O2 := Co2ppmint(ac[3:5])
- cv.O2 = O2 / 100
- cv.SimSignal = 24
- fmt.Println("cv.氧气浓度:", cv.O2)
- cv.Time = time.Now()
- WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload())
- }
- if code[1] == "co2" && len(msg.Payload()) == 7 {
- ac := msg.Payload()
- CO2 := Co2ppmint(ac[3:5])
- cv.Co2 = CO2 / 10000
- cv.SimSignal = 24
- fmt.Println("cv.二氧化碳浓度:", cv.Co2)
- cv.Time = time.Now()
- WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload())
- }
- return
- }
- //二氧化碳 氧气
- func Co2ppmint(command []byte) float64 {
- if len(command) == 2 {
- var x int16
- b_buf := bytes.NewBuffer(command)
- binary.Read(b_buf, binary.BigEndian, &x)
- c := float64(x) * 10
- c_str := strconv.FormatFloat(c, 'f', 2, 64)
- c_f64, _ := strconv.ParseFloat(c_str, 64)
- return c_f64
- }
- return 0
- }
- //网络信号 CSQ
- func CSQ(command []byte) float64 {
- return float64(command[0])
- }
- func BytesToString(b []byte) string {
- bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
- sh := reflect.StringHeader{bh.Data, bh.Len}
- return *(*string)(unsafe.Pointer(&sh))
- }
- //写入数据库 redis
- func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte) {
- cachevalue, _ := labsop.GetBoxLastdata(tagmac)
- datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, cv.O2, cv.Temperature, cv.Co2, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
- cjutc := cv.Time.Unix()
- if errv1 == nil && cjutc >= cachevalue.Time.Unix() {
- //写入成功后才写入缓存
- fmt.Println("----写入----", tagmac)
- labsop.UpdateBoxLastdata(tagmac, datapoint)
- }
- }
- func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) {
- httpClient := &http.Client{
- //Transport:nil,
- //CheckRedirect: nil,
- Timeout: 6 * time.Second,
- }
- var httpReq *http.Request
- b, _ := json.Marshal(postDict)
- postBytesReader := bytes.NewReader(b)
- httpReq, _ = http.NewRequest(method, strUrl, postBytesReader)
- httpReq.Header.Add("Content-Type", "application/json")
- response, _ := httpClient.Do(httpReq)
- if response != nil {
- body, _ := ioutil.ReadAll(response.Body)
- json.Unmarshal(body, &listPtr)
- }
- return
- }
- func main() {
- service.Config = service.ReadAppConfig("conf/app.conf")
- //初始化缓存区
- redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr)
- // Set up channel on which to send signal notifications.
- termChan := make(chan os.Signal, 1)
- signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
- nsqCfg := nsq.NewConfig()
- var err error
- service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg)
- if err != nil {
- log.Fatalf("failed creating producer %s", err)
- }
- opts := mqtt.NewClientOptions()
- opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr)
- opts.SetClientID(service.Config.Receive.Mqtt.Clientid)
- opts.SetUsername(service.Config.Receive.Mqtt.Username)
- opts.SetPassword(service.Config.Receive.Mqtt.Password)
- opts.SetDefaultPublishHandler(mqttMessageHandler)
- opts.SetKeepAlive(60 * time.Second)
- opts.SetCleanSession(false)
- opts.SetPingTimeout(1 * time.Second)
- mqttClient := mqtt.NewClient(opts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- fmt.Println("服务开启")
- time.Sleep(6 * time.Second)
- // Wait for receiving a signal.
- <-termChan
- mqttClient.Disconnect(250)
- }
|