package main import ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "os" "os/signal" "strconv" "strings" "syscall" "time" "dashoo.cn/smartdevice_mqtt/business/trigger" "github.com/go-xorm/xorm" "dashoo.cn/utils" "reflect" "unsafe" "dashoo.cn/labsop" "dashoo.cn/utils/redis" "dashoo.cn/smartdevice_mqtt/service" "github.com/eclipse/paho.mqtt.golang" "github.com/nsqio/go-nsq" ) var ( producer *nsq.Producer // respChan chan *nsq.ProducerTransaction ) var cv service.ChannelValue //发送返回参数 type ReturnSMS struct { Code int `json:"code"` Message string `json:"message"` } type TopicMessage struct { Topic string `json:"topic"` Payload string `json:"payload"` } // 收到订阅的mqtt消息处理 func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) { // fmt.Printf("TOPIC: %s\n", msg.Topic()) // fmt.Printf("MSG: % X\n", msg.Payload()) SeedToControl(msg) // datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock) // producer.Publish(service.Config.Send.Controller.Topic, msg.Payload()) } func SeedToControl(msg mqtt.Message) { var tms TopicMessage fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: % X\n", msg.Payload()) code := strings.Split(msg.Topic(), "/") tms.Topic = code[2] if len(msg.Payload()) == 1 { v2, _ := strconv.ParseFloat(BytesToString(msg.Payload()), 64) cv.Doorlock = v2 cv.Time = time.Now() WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload(), utils.DBE) } // tms.Payload = fmt.Sprintf("%X", msg.Payload()) // ActionUrl := service.Config.Send.Controller.ActionUrl // body := ApiPost(ActionUrl, "POST", tms) // fmt.Println("", body.Code, body.Message) return } //网络信号 CSQ func CSQ(command []byte) float64 { return float64(command[0]) } func BytesToString(b []byte) string { bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) sh := reflect.StringHeader{bh.Data, bh.Len} return *(*string)(unsafe.Pointer(&sh)) } //写入数据库 redis func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte, dbe *xorm.Engine) { svc := trigger.GetTriggerService(dbe) list := svc.GetHistoryByChannelCode(tagmac, service.Config.Send.Delay) for i := 0; i < len(list); i++ { svc.UpdateEntitybyId(list[i].Id, cv.Doorlock, list[i].CreateOn.Unix()) } cachevalue, _ := labsop.GetBoxLastdata(tagmac) datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, 0, cv.Temperature, 0, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock) cjutc := cv.Time.Unix() if errv1 == nil && cjutc >= cachevalue.Time.Unix() { //写入成功后才写入缓存 fmt.Println("设备号", tagmac) labsop.UpdateBoxLastdata(tagmac, datapoint) } } func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) { httpClient := &http.Client{ //Transport:nil, //CheckRedirect: nil, Timeout: 6 * time.Second, } var httpReq *http.Request b, _ := json.Marshal(postDict) postBytesReader := bytes.NewReader(b) httpReq, _ = http.NewRequest(method, strUrl, postBytesReader) httpReq.Header.Add("Content-Type", "application/json") response, _ := httpClient.Do(httpReq) if response != nil { body, _ := ioutil.ReadAll(response.Body) json.Unmarshal(body, &listPtr) } return } func main() { service.Config = service.ReadAppConfig("conf/app.conf") //初始化缓存区 redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr) utils.InitDb(service.Config.Send.Db.Type, service.Config.Send.Db.Addr, service.Config.Send.Db.Db, service.Config.Send.Db.User, service.Config.Send.Db.Password) // Set up channel on which to send signal notifications. termChan := make(chan os.Signal, 1) signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM) nsqCfg := nsq.NewConfig() var err error service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg) if err != nil { log.Fatalf("failed creating producer %s", err) } opts := mqtt.NewClientOptions() opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr) opts.SetClientID(service.Config.Receive.Mqtt.Clientid) opts.SetUsername(service.Config.Receive.Mqtt.Username) opts.SetPassword(service.Config.Receive.Mqtt.Password) opts.SetDefaultPublishHandler(mqttMessageHandler) opts.SetKeepAlive(60 * time.Second) opts.SetCleanSession(false) opts.SetPingTimeout(1 * time.Second) mqttClient := mqtt.NewClient(opts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } fmt.Println("服务开启") // ticker := time.NewTicker(time.Second * 30) // go func() { // for _ = range ticker.C { // str_time := time.Now().Format("2006-01-02 15:04:05") // // mqttClient.Publish("sg/cmd/63000105", 0, true, "SHELL:@DO1=?") // // time.Sleep(1 * time.Second) // // mqttClient.Publish("sg/cmd/64100011", 0, true, "SHELL:@DO1=?") // // time.Sleep(1 * time.Second) // // mqttClient.Publish("sg/cmd/64100012", 0, true, "SHELL:@DO1=?") // // // mqttClient.Publish("sg/cmd/64100010", 1, true, "SHELL:@DO1=?") // strUrl := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100012&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4" // s := Apiget(strUrl) // time.Sleep(1 * time.Second) // strUrl1 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100013&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4" // s1 := Apiget(strUrl1) // time.Sleep(1 * time.Second) // strUrl2 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100014&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4" // s2 := Apiget(strUrl2) // fmt.Println("----发送----", str_time, string(s), string(s1), string(s2)) // } // }() time.Sleep(6 * time.Second) // if token := seed/sg01.Unsubscribe("seed/sg01"); token.Wait() && token.Error() != nil { // fmt.Println(token.Error()) // os.Exit(1) // } // Wait for receiving a signal. <-termChan mqttClient.Disconnect(250) } func Apiget(str string) (body []byte) { str = strings.Replace(str, " ", "%20", -1) response, _ := http.Get(str) if response != nil { defer response.Body.Close() body, _ = ioutil.ReadAll(response.Body) } //json.Unmarshal(body, &devices) return }