package main import ( "bytes" "encoding/binary" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "os" "os/signal" "strconv" "strings" "syscall" "time" "reflect" "unsafe" "dashoo.cn/labsop" "dashoo.cn/utils/redis" "dashoo.cn/sg_m3_4g_mqtt/service" "github.com/eclipse/paho.mqtt.golang" "github.com/nsqio/go-nsq" ) var ( producer *nsq.Producer // respChan chan *nsq.ProducerTransaction ) var cv service.ChannelValue //发送返回参数 type ReturnSMS struct { Code int `json:"code"` Message string `json:"message"` } type TopicMessage struct { Topic string `json:"topic"` Payload string `json:"payload"` } // 收到订阅的mqtt消息处理 func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { // fmt.Printf("TOPIC: %s\n", msg.Topic()) // fmt.Printf("MSG: % X\n", msg.Payload()) SeedToControl(msg) // datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock) // producer.Publish(service.Config.Send.Controller.Topic, msg.Payload()) } func SeedToControl(msg mqtt.Message) { var tms TopicMessage fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: % X\n", msg.Payload()) code := strings.Split(msg.Topic(), "/") tms.Topic = code[2] fmt.Println("传感器:", code[1]) if code[1] == "o2" && len(msg.Payload()) == 7 { ac := msg.Payload() O2 := Co2ppmint(ac[3:5]) cv.O2 = O2 / 100 cv.SimSignal = 24 fmt.Println("cv.氧气浓度:", cv.O2) cv.Time = time.Now() WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload()) } if code[1] == "co2" && len(msg.Payload()) == 7 { ac := msg.Payload() CO2 := Co2ppmint(ac[3:5]) cv.Co2 = CO2 / 10000 cv.SimSignal = 24 fmt.Println("cv.二氧化碳浓度:", cv.Co2) cv.Time = time.Now() WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload()) } return } //二氧化碳 氧气 func Co2ppmint(command []byte) float64 { if len(command) == 2 { var x int16 b_buf := bytes.NewBuffer(command) binary.Read(b_buf, binary.BigEndian, &x) c := float64(x) * 10 c_str := strconv.FormatFloat(c, 'f', 2, 64) c_f64, _ := strconv.ParseFloat(c_str, 64) return c_f64 } return 0 } //网络信号 CSQ func CSQ(command []byte) float64 { return float64(command[0]) } func BytesToString(b []byte) string { bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) sh := reflect.StringHeader{bh.Data, bh.Len} return *(*string)(unsafe.Pointer(&sh)) } //写入数据库 redis func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte) { cachevalue, _ := labsop.GetBoxLastdata(tagmac) datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, cv.O2, cv.Temperature, cv.Co2, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock) cjutc := cv.Time.Unix() if errv1 == nil && cjutc >= cachevalue.Time.Unix() { //写入成功后才写入缓存 fmt.Println("----写入----", tagmac) labsop.UpdateBoxLastdata(tagmac, datapoint) } } func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) { httpClient := &http.Client{ //Transport:nil, //CheckRedirect: nil, Timeout: 6 * time.Second, } var httpReq *http.Request b, _ := json.Marshal(postDict) postBytesReader := bytes.NewReader(b) httpReq, _ = http.NewRequest(method, strUrl, postBytesReader) httpReq.Header.Add("Content-Type", "application/json") response, _ := httpClient.Do(httpReq) if response != nil { body, _ := ioutil.ReadAll(response.Body) json.Unmarshal(body, &listPtr) } return } func main() { service.Config = service.ReadAppConfig("conf/app.conf") //初始化缓存区 redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr) // Set up channel on which to send signal notifications. termChan := make(chan os.Signal, 1) signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM) nsqCfg := nsq.NewConfig() var err error service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg) if err != nil { log.Fatalf("failed creating producer %s", err) } opts := mqtt.NewClientOptions() opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr) opts.SetClientID(service.Config.Receive.Mqtt.Clientid) opts.SetUsername(service.Config.Receive.Mqtt.Username) opts.SetPassword(service.Config.Receive.Mqtt.Password) opts.SetDefaultPublishHandler(mqttMessageHandler) opts.SetKeepAlive(60 * time.Second) opts.SetCleanSession(false) opts.SetPingTimeout(1 * time.Second) mqttClient := mqtt.NewClient(opts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } fmt.Println("服务开启") time.Sleep(6 * time.Second) // Wait for receiving a signal. <-termChan mqttClient.Disconnect(250) }