| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- package main
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "syscall"
- "time"
- "dashoo.cn/smartdevice_mqtt/business/trigger"
- "github.com/go-xorm/xorm"
- "dashoo.cn/utils"
- "reflect"
- "unsafe"
- "dashoo.cn/labsop"
- "dashoo.cn/utils/redis"
- "dashoo.cn/smartdevice_mqtt/service"
- "github.com/eclipse/paho.mqtt.golang"
- "github.com/nsqio/go-nsq"
- )
- var (
- producer *nsq.Producer
- // respChan chan *nsq.ProducerTransaction
- )
- var cv service.ChannelValue
- //发送返回参数
- type ReturnSMS struct {
- Code int `json:"code"`
- Message string `json:"message"`
- }
- type TopicMessage struct {
- Topic string `json:"topic"`
- Payload string `json:"payload"`
- }
- // 收到订阅的mqtt消息处理
- func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
- // fmt.Printf("TOPIC: %s\n", msg.Topic())
- // fmt.Printf("MSG: % X\n", msg.Payload())
- SeedToControl(msg)
- // datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
- // producer.Publish(service.Config.Send.Controller.Topic, msg.Payload())
- }
- func SeedToControl(msg mqtt.Message) {
- var tms TopicMessage
- fmt.Printf("TOPIC: %s\n", msg.Topic())
- fmt.Printf("MSG: % X\n", msg.Payload())
- code := strings.Split(msg.Topic(), "/")
- tms.Topic = code[2]
- if len(msg.Payload()) == 1 {
- v2, _ := strconv.ParseFloat(BytesToString(msg.Payload()), 64)
- cv.Doorlock = v2
- cv.Time = time.Now()
- WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload(), utils.DBE)
- }
- // tms.Payload = fmt.Sprintf("%X", msg.Payload())
- // ActionUrl := service.Config.Send.Controller.ActionUrl
- // body := ApiPost(ActionUrl, "POST", tms)
- // fmt.Println("", body.Code, body.Message)
- return
- }
- //网络信号 CSQ
- func CSQ(command []byte) float64 {
- return float64(command[0])
- }
- func BytesToString(b []byte) string {
- bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
- sh := reflect.StringHeader{bh.Data, bh.Len}
- return *(*string)(unsafe.Pointer(&sh))
- }
- //写入数据库 redis
- func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte, dbe *xorm.Engine) {
- svc := trigger.GetTriggerService(dbe)
- list := svc.GetHistoryByChannelCode(tagmac, service.Config.Send.Delay)
- for i := 0; i < len(list); i++ {
- svc.UpdateEntitybyId(list[i].Id, cv.Doorlock, list[i].CreateOn.Unix())
- }
- cachevalue, _ := labsop.GetBoxLastdata(tagmac)
- datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, 0, cv.Temperature, 0, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
- cjutc := cv.Time.Unix()
- if errv1 == nil && cjutc >= cachevalue.Time.Unix() {
- //写入成功后才写入缓存
- fmt.Println("设备号", tagmac)
- labsop.UpdateBoxLastdata(tagmac, datapoint)
- }
- }
- func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) {
- httpClient := &http.Client{
- //Transport:nil,
- //CheckRedirect: nil,
- Timeout: 6 * time.Second,
- }
- var httpReq *http.Request
- b, _ := json.Marshal(postDict)
- postBytesReader := bytes.NewReader(b)
- httpReq, _ = http.NewRequest(method, strUrl, postBytesReader)
- httpReq.Header.Add("Content-Type", "application/json")
- response, _ := httpClient.Do(httpReq)
- if response != nil {
- body, _ := ioutil.ReadAll(response.Body)
- json.Unmarshal(body, &listPtr)
- }
- return
- }
- func main() {
- service.Config = service.ReadAppConfig("conf/app.conf")
- //初始化缓存区
- redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr)
- utils.InitDb(service.Config.Send.Db.Type, service.Config.Send.Db.Addr, service.Config.Send.Db.Db, service.Config.Send.Db.User, service.Config.Send.Db.Password)
- // Set up channel on which to send signal notifications.
- termChan := make(chan os.Signal, 1)
- signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
- nsqCfg := nsq.NewConfig()
- var err error
- service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg)
- if err != nil {
- log.Fatalf("failed creating producer %s", err)
- }
- opts := mqtt.NewClientOptions()
- opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr)
- opts.SetClientID(service.Config.Receive.Mqtt.Clientid)
- opts.SetUsername(service.Config.Receive.Mqtt.Username)
- opts.SetPassword(service.Config.Receive.Mqtt.Password)
- opts.SetDefaultPublishHandler(mqttMessageHandler)
- opts.SetKeepAlive(60 * time.Second)
- opts.SetCleanSession(false)
- opts.SetPingTimeout(1 * time.Second)
- mqttClient := mqtt.NewClient(opts)
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- fmt.Println("服务开启")
- // ticker := time.NewTicker(time.Second * 30)
- // go func() {
- // for _ = range ticker.C {
- // str_time := time.Now().Format("2006-01-02 15:04:05")
- // // mqttClient.Publish("sg/cmd/63000105", 0, true, "SHELL:@DO1=?")
- // // time.Sleep(1 * time.Second)
- // // mqttClient.Publish("sg/cmd/64100011", 0, true, "SHELL:@DO1=?")
- // // time.Sleep(1 * time.Second)
- // // mqttClient.Publish("sg/cmd/64100012", 0, true, "SHELL:@DO1=?")
- // // // mqttClient.Publish("sg/cmd/64100010", 1, true, "SHELL:@DO1=?")
- // strUrl := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100012&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
- // s := Apiget(strUrl)
- // time.Sleep(1 * time.Second)
- // strUrl1 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100013&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
- // s1 := Apiget(strUrl1)
- // time.Sleep(1 * time.Second)
- // strUrl2 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100014&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
- // s2 := Apiget(strUrl2)
- // fmt.Println("----发送----", str_time, string(s), string(s1), string(s2))
- // }
- // }()
- time.Sleep(6 * time.Second)
- // if token := seed/sg01.Unsubscribe("seed/sg01"); token.Wait() && token.Error() != nil {
- // fmt.Println(token.Error())
- // os.Exit(1)
- // }
- // Wait for receiving a signal.
- <-termChan
- mqttClient.Disconnect(250)
- }
- func Apiget(str string) (body []byte) {
- str = strings.Replace(str, " ", "%20", -1)
- response, _ := http.Get(str)
- if response != nil {
- defer response.Body.Close()
- body, _ = ioutil.ReadAll(response.Body)
- }
- //json.Unmarshal(body, &devices)
- return
- }
|