Selaa lähdekoodia

智能锁提交

all 4 vuotta sitten
commit
92b8c88a5e

+ 21 - 0
sg_m3_4g_mqtt/conf/app.conf

@@ -0,0 +1,21 @@
+[receive]
+description="从mqtt接收数据"
+
+[receive.mqtt]
+clientid = "rtu002"
+username = "sg1"
+password = "dashoopwd"
+serveraddr = "tcp://39.98.34.197:1883"
+topic = "m3/#"
+
+[send]
+description="数据转存到nsq"
+
+[send.controller]
+actionUrl = "http://192.168.0.181:8091/api/remote_control/rtuudate"
+Nsqdtcpaddr = "39.98.34.197:9150"
+topic = "blackboxd2"
+
+[send.redis]
+addr = "39.98.34.197:26379"
+poolnum = 50

+ 186 - 0
sg_m3_4g_mqtt/main.go

@@ -0,0 +1,186 @@
+package main
+
+import (
+	"bytes"
+	"encoding/binary"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"os"
+	"os/signal"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"reflect"
+	"unsafe"
+
+	"dashoo.cn/labsop"
+	"dashoo.cn/utils/redis"
+
+	"dashoo.cn/sg_m3_4g_mqtt/service"
+	"github.com/eclipse/paho.mqtt.golang"
+	"github.com/nsqio/go-nsq"
+)
+
+var (
+	producer *nsq.Producer
+
+//	respChan chan *nsq.ProducerTransaction
+)
+var cv service.ChannelValue
+
+//发送返回参数
+type ReturnSMS struct {
+	Code    int    `json:"code"`
+	Message string `json:"message"`
+}
+
+type TopicMessage struct {
+	Topic   string `json:"topic"`
+	Payload string `json:"payload"`
+}
+
+// 收到订阅的mqtt消息处理
+func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
+
+	// fmt.Printf("TOPIC: %s\n", msg.Topic())
+	// fmt.Printf("MSG: % X\n", msg.Payload())
+	SeedToControl(msg)
+	// datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
+
+	// producer.Publish(service.Config.Send.Controller.Topic, msg.Payload())
+}
+
+func SeedToControl(msg mqtt.Message) {
+	var tms TopicMessage
+	fmt.Printf("TOPIC: %s\n", msg.Topic())
+	fmt.Printf("MSG: % X\n", msg.Payload())
+	code := strings.Split(msg.Topic(), "/")
+	tms.Topic = code[2]
+	fmt.Println("传感器:", code[1])
+	if code[1] == "o2" && len(msg.Payload()) == 7 {
+		ac := msg.Payload()
+		O2 := Co2ppmint(ac[3:5])
+		cv.O2 = O2 / 100
+		cv.SimSignal = 24
+		fmt.Println("cv.氧气浓度:", cv.O2)
+		cv.Time = time.Now()
+		WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload())
+	}
+	if code[1] == "co2" && len(msg.Payload()) == 7 {
+		ac := msg.Payload()
+		CO2 := Co2ppmint(ac[3:5])
+		cv.Co2 = CO2 / 10000
+		cv.SimSignal = 24
+		fmt.Println("cv.二氧化碳浓度:", cv.Co2)
+		cv.Time = time.Now()
+		WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload())
+	}
+	return
+}
+
+//二氧化碳 氧气
+func Co2ppmint(command []byte) float64 {
+	if len(command) == 2 {
+		var x int16
+		b_buf := bytes.NewBuffer(command)
+		binary.Read(b_buf, binary.BigEndian, &x)
+		c := float64(x) * 10
+		c_str := strconv.FormatFloat(c, 'f', 2, 64)
+		c_f64, _ := strconv.ParseFloat(c_str, 64)
+		return c_f64
+	}
+	return 0
+}
+
+//网络信号 CSQ
+func CSQ(command []byte) float64 {
+	return float64(command[0])
+}
+
+func BytesToString(b []byte) string {
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+	sh := reflect.StringHeader{bh.Data, bh.Len}
+	return *(*string)(unsafe.Pointer(&sh))
+}
+
+//写入数据库 redis
+func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte) {
+	cachevalue, _ := labsop.GetBoxLastdata(tagmac)
+	datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, cv.O2, cv.Temperature, cv.Co2, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
+
+	cjutc := cv.Time.Unix()
+	if errv1 == nil && cjutc >= cachevalue.Time.Unix() {
+		//写入成功后才写入缓存
+		fmt.Println("----写入----", tagmac)
+		labsop.UpdateBoxLastdata(tagmac, datapoint)
+	}
+}
+
+func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) {
+
+	httpClient := &http.Client{
+		//Transport:nil,
+		//CheckRedirect: nil,
+		Timeout: 6 * time.Second,
+	}
+
+	var httpReq *http.Request
+
+	b, _ := json.Marshal(postDict)
+	postBytesReader := bytes.NewReader(b)
+	httpReq, _ = http.NewRequest(method, strUrl, postBytesReader)
+	httpReq.Header.Add("Content-Type", "application/json")
+	response, _ := httpClient.Do(httpReq)
+	if response != nil {
+		body, _ := ioutil.ReadAll(response.Body)
+		json.Unmarshal(body, &listPtr)
+	}
+	return
+}
+
+func main() {
+	service.Config = service.ReadAppConfig("conf/app.conf")
+	//初始化缓存区
+	redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr)
+	// Set up channel on which to send signal notifications.
+	termChan := make(chan os.Signal, 1)
+	signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
+
+	nsqCfg := nsq.NewConfig()
+	var err error
+	service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg)
+	if err != nil {
+		log.Fatalf("failed creating producer %s", err)
+	}
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr)
+	opts.SetClientID(service.Config.Receive.Mqtt.Clientid)
+	opts.SetUsername(service.Config.Receive.Mqtt.Username)
+	opts.SetPassword(service.Config.Receive.Mqtt.Password)
+	opts.SetDefaultPublishHandler(mqttMessageHandler)
+	opts.SetKeepAlive(60 * time.Second)
+	opts.SetCleanSession(false)
+	opts.SetPingTimeout(1 * time.Second)
+
+	mqttClient := mqtt.NewClient(opts)
+	if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
+		log.Fatal(token.Error())
+	}
+
+	if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil {
+		log.Fatal(token.Error())
+	}
+	fmt.Println("服务开启")
+
+	time.Sleep(6 * time.Second)
+
+	// Wait for receiving a signal.
+	<-termChan
+	mqttClient.Disconnect(250)
+
+}

+ 37 - 0
sg_m3_4g_mqtt/pub_test.go

@@ -0,0 +1,37 @@
+package main
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/eclipse/paho.mqtt.golang"
+)
+
+func TestPub(t *testing.T) {
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker("tcp://39.98.34.197:1883")
+	opts.SetClientID("test")
+	opts.SetUsername("sg1")
+	opts.SetPassword("dashoopwd")
+	// opts.SetDefaultPublishHandler(messageHandler)
+	opts.SetKeepAlive(2 * time.Second)
+	opts.SetPingTimeout(1 * time.Second)
+
+	c := mqtt.NewClient(opts)
+	if token := c.Connect(); token.Wait() && token.Error() != nil {
+		panic(token.Error())
+	}
+
+	for i := 0; i < 3; i++ {
+		text := fmt.Sprintf("this is msg #%d!", i)
+		token := c.Publish("go-mqtt/sample", 0, false, text)
+		token.Wait()
+	}
+
+	time.Sleep(6 * time.Second)
+
+	c.Disconnect(250)
+
+	time.Sleep(1 * time.Second)
+}

+ 52 - 0
sg_m3_4g_mqtt/service/conf.go

@@ -0,0 +1,52 @@
+package service
+
+import (
+	"io/ioutil"
+	"os"
+
+	"github.com/naoina/toml"
+)
+
+type AppConfig struct {
+	Receive struct {
+		Description string
+		Mqtt        struct {
+			Clientid   string
+			Username   string
+			Password   string
+			Serveraddr string
+			Topic      string
+		}
+	}
+
+	Send struct {
+		Description string
+		Redis       struct {
+			Addr    string
+			Poolnum int
+		}
+		Controller struct {
+			ActionUrl   string
+			Nsqdtcpaddr string
+			Topic       string
+		}
+	}
+}
+
+var Config AppConfig
+
+func ReadAppConfig(path string) (config AppConfig) {
+	f, err := os.Open(path)
+	if err != nil {
+		panic(err)
+	}
+	defer f.Close()
+	buf, err := ioutil.ReadAll(f)
+	if err != nil {
+		panic(err)
+	}
+	if err := toml.Unmarshal(buf, &config); err != nil {
+		panic(err)
+	}
+	return
+}

+ 87 - 0
sg_m3_4g_mqtt/service/writedata.go

@@ -0,0 +1,87 @@
+package service
+
+import (
+	"encoding/json"
+	"time"
+
+	wdcc "dashoo.cn/labsop"
+	"github.com/nsqio/go-nsq"
+)
+
+//tag说明f:fields,t:tag,-:不赋值
+type ChannelValue struct {
+	Time             time.Time
+	TimeUnix         int64
+	Sequence         int64
+	Mxinh            int64
+	T2bxh            float64
+	Lat              float64   //纬度
+	Lng              float64   //经度
+	Speed            float64   //速度
+	Temperature      float64   //温度
+	Humidity         float64   //湿度
+	Voltage          float64   //电压
+	RSSI             float64   //信号 Signal (sensor到blackbox)
+	CSTime           time.Time //传输时间
+	SensorNo         int64     //Sensor序列号 Zbackup4
+	BBVoltage        float64   //bb电压
+	BBCSTime         time.Time //BB传输time  Zbackup2
+	BBMac            string    //bbmac
+	BBMacTag         string    //bbmactag
+	BBNo             int64     //bb序列号 SensorNo
+	SimSignal        int64     //移动信号  Zbackup3
+	SimIP            string    //sim900 IP Zbackup7
+	ServiceTime      time.Time //服务器时间  Zbackup5
+	RequestData      string    //请求原数据  Zbackup1
+	O2               float64   //氧气浓度  Zbackup8
+	SO2              float64   //二氧化硫  Zbackup8
+	Isreplace        int64     //是否替换(0未替换,1已替换)  Zbackup9
+	RealTemperature  float64   //原温度  Zbackup10
+	Co2              float64   //Co2
+	H2O              float64   //水质
+	WindSpeed        float64   //风速
+	Pressure         float64   //气压
+	Ots              float64   //压力大港
+	Displacement     float64   //位移
+	ClO2             float64   //二氧化氯
+	C2H4             float64   //乙烯
+	C2H2             float64   //乙炔
+	Cl2              float64   //氯气
+	O3               float64   //臭氧
+	TVOC             float64   //TVOC
+	Waterflow        float64   //waterflow
+	DeviceState      int64     // 监控设备状态
+	ElectricalPower  int64     // 电器功率
+	ElectricalSupply int64     // 市电电压
+	NTCR             float64   // Sensor端NTC热敏电阻
+	ThreeACVA        float64   // A相电压
+	ThreeACVB        float64   // B相电压
+	ThreeACVC        float64   // C相电压
+	ThreeACAA        float64   // A相电流
+	ThreeACAB        float64   // B相电流
+	ThreeACAC        float64   // C相电流
+	ThreeACW         float64   // 三相有功总电能
+	InfraredBarrier  int64     // 红外阻挡物(0 无阻碍物 1有障碍物 other 其他)
+	GpsBaseStation   string    // gps基站信息
+	LiquidLevel      float64   // 液位
+	Doorlock         float64   // 门锁
+}
+
+var Producernsq *nsq.Producer
+
+type blackboxddata struct {
+	Code      string
+	Datapoint wdcc.DatapointLabSop
+}
+
+func Writedata(tagmac, bbmac, zbackup1, gpsbasestation string, v1, v2, v3, v4, v5, v6 float64, cjtime, cstime, bbtime time.Time, xulieh, mobilexh, sensornum, isreplace, infraredbarrier int64, o2, realtemperature, co2, waterflow, lng, lat float64, devicestate, wdtype int, electricalpower, electricalsupply int64, ntrc, threeacva, threeacaa, threeacvb, threeacab, threeacvc, threeacac, threeacw, doorlock float64) (wdcc.DatapointLabSop, error) {
+	var data wdcc.DatapointLabSop
+	data = wdcc.DatapointLabSop{Time: cjtime, Temperature: v1, Humidity: v2, Voltage: v3, RSSI: v4, CSTime: cstime, BBVoltage: v5, BBMac: bbmac, BBNo: xulieh, RequestData: zbackup1, BBCSTime: bbtime, SimSignal: mobilexh, SensorNo: sensornum, ServiceTime: time.Now(), O2: o2, Isreplace: isreplace, RealTemperature: realtemperature, Co2: co2, Waterflow: waterflow, Lat: lat, Lng: lng, DeviceState: int64(devicestate), ElectricalPower: electricalpower, ElectricalSupply: electricalsupply, GpsBaseStation: gpsbasestation, NTCR: ntrc, ThreeACVA: threeacva, ThreeACVB: threeacvb, ThreeACVC: threeacvc, ThreeACAA: threeacaa, ThreeACAB: threeacab, ThreeACAC: threeacac, ThreeACW: threeacw, InfraredBarrier: infraredbarrier, Doorlock: doorlock}
+	bdata, err := json.Marshal(blackboxddata{tagmac, data})
+	if err == nil {
+		errnsq := Producernsq.Publish(Config.Send.Controller.Topic, bdata)
+		return data, errnsq
+		// return data, nil
+	}
+	return data, err
+}

BIN
sg_m3_4g_mqtt/sg_m3_4g_mqtt


+ 118 - 0
smartdevice_mqtt/business/trigger/trigger.go

@@ -0,0 +1,118 @@
+package trigger
+
+import (
+	"time"
+)
+
+type Triggers struct {
+	Id               int `xorm:"not null pk INT(11)"`
+	AId              int
+	AccCode          string
+	RemindTime       time.Time
+	ValidityDate     time.Time
+	KCCount          float32
+	ContentMode      string
+	Notification     int
+	TimeNotification int
+	KCMin            float32
+	KCMax            float32
+	SPara3           string
+	Feedback         int
+	FeedbackTime     int
+	SurplusCount     int
+	Category         int
+	Code             string
+	Switchalarm      int
+}
+type Channels struct {
+	Id                 int `xorm:"not null pk INT(11)"`
+	Title              string
+	DataItem           int
+	About              string
+	ProjectSourse      string
+	ProjectAccount     string
+	ProjectAccountName string
+	Well_ID            string
+}
+
+//报警历史
+type Record_History struct {
+	Id                 int
+	ProjectSourse      string
+	ProjectAccount     string
+	ProjectAccountName string
+	SendTo             string
+	ActionType         string
+	Message            string
+	IsRead             int
+	IsSend             int
+	SendErrorMsg       string
+	DeviceId           string
+	DeviceName         string
+	ChannelCode        string
+	ChannelName        string
+	CreateOn           time.Time
+	CreateUserId       int
+	CreateBy           string
+	ModifiedOn         time.Time
+	ModifiedUserId     int
+	ModifiedBy         string
+}
+
+type Channels_dg_oilwell struct {
+	Id        int `xorm:"not null pk INT(11)"`
+	WellCode  string
+	WellType  string
+	OrgCode   string
+	Well_Name string
+	Scalar    string
+	Vector    string
+	CreateOn  time.Time
+}
+type Channelslat struct {
+	Id       int `xorm:"not null pk INT(11)"`
+	Unit     string
+	UnitSign string
+}
+
+type TriggerAction struct {
+	To      string `json:"tomail"`
+	Subject string `json:"title"`
+	Body    string `json:"body"`
+}
+
+type TriggerActionSms struct {
+	To   string `json:"to"`
+	Body string `json:"body"`
+}
+
+type Series struct {
+	Name    string          `json:"name"`
+	Columns []string        `json:"columns"`
+	Points  [][]interface{} `json:"points"`
+}
+
+type UserInfo struct {
+	Id           int
+	UserName     string
+	UserPassword string
+	PublicKey    string
+}
+
+//冲减
+type AccountInfo struct {
+	Id                 int       `xorm:"not null pk autoincr INT(11)" json:"id"`
+	ProjectSourse      string    `xorm:"VARCHAR(50)" json:"sourse"`
+	ProjectAccount     string    `xorm:"VARCHAR(50)" json:"account"`
+	ProjectAccountName string    `xorm:"VARCHAR(50)" json:"name"`
+	ActionType         string    `xorm:"VARCHAR(50)" json:"type"`
+	SurplusCount       int       `xorm:"INT(10)" json:"surplus"`
+	StartTime          time.Time `xorm:"DATETIME" json:"start"`
+	EndTime            time.Time `xorm:"DATETIME" json:"end"`
+	CreateOn           time.Time `xorm:"DATETIME created"`
+	CreateUserId       int       `xorm:"INT(11)" form:"-"  json:"-"`
+	CreateBy           string    `xorm:"VARCHAR(50)"`
+	ModifiedOn         time.Time `xorm:"DATETIME updated" form:"-" json:"-"`
+	ModifiedUserId     int       `xorm:"INT(11)" form:"-"  json:"-"`
+	ModifiedBy         string    `xorm:"VARCHAR(50)"`
+}

+ 236 - 0
smartdevice_mqtt/business/trigger/triggerService.go

@@ -0,0 +1,236 @@
+package trigger
+
+import (
+	"fmt"
+	"strconv"
+	"time"
+
+	"dashoo.cn/utils"
+	. "dashoo.cn/utils/db"
+	"github.com/go-xorm/xorm"
+)
+
+type TriggerService struct {
+	ServiceBase
+}
+
+//出库管理子表
+type MaterialCKDetail struct {
+	Id             int       `xorm:"not null pk autoincr INT(11)"`
+	CKNo           string    `xorm:"VARCHAR(50)"`      //出库单号
+	ProjectId      int       `xorm:"INT(11)"`          //项目Id
+	MaterialId     int       `xorm:"INT(11)"`          //物料Id
+	Unit           string    `xorm:"VARCHAR(50)"`      //单位Name
+	BatchNo        string    `xorm:"VARCHAR(50)"`      //批次号
+	CKCount        float32   `xorm:"FLOAT"`            //数量
+	CKWeight       float32   `xorm:"FLOAT"`            //重量
+	UnitPrice      float32   `xorm:"FLOAT"`            //单价
+	TotalPrice     float32   `xorm:"FLOAT"`            //总价
+	Remark         string    `xorm:"TEXT"`             //备注
+	CreateOn       time.Time `xorm:"DATETIME created"` //创建时间
+	CreateUserId   int       `xorm:"INT(11)"`
+	CreateBy       string    `xorm:"VARCHAR(50)"` //创建人
+	ModifiedOn     time.Time `xorm:"DATETIME updated"`
+	ModifiedUserId int       `xorm:"INT(11)"`
+	ModifiedBy     string    `xorm:"VARCHAR(50)"`
+}
+
+func GetTriggerService(xormEngine *xorm.Engine) *TriggerService {
+	s := new(TriggerService)
+	s.DBE = xormEngine
+	return s
+}
+
+//时间查询
+func (s *TriggerService) GetEntitiesWithSearch(searchstring string) []Triggers {
+	sql := `select a.Id,a.AccCode,a.Feedback,a.AID,a.FeedbackTime,d.SurplusCount, a.ValidityDate,a.Id,a.Notification,a.TimeNotification,a.RemindTime,a.KCMin,a.KCMax, b.SPara3,c.KCCount,a.ContentMode,a.Switchalarm from Triggers   a  
+	inner join Actions b on a.AId=b.Id
+	inner join accountinfo d on d.Id=2925
+	inner join materialbatchkc c on a.MaterialbatchkcId=c.Id ` + searchstring
+	fmt.Println(sql)
+	list := make([]Triggers, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//类型更新
+func (s *TriggerService) Update_oilwell_type(scalar, vector, id string) error {
+	var err error
+
+	sql := "UPDATE channels_dg_oilwell SET Scalar='" + scalar + "', Vector ='" + vector + "' WHERE id= " + id
+	fmt.Println("-----sql---", sql)
+	_, err = s.DBE.Query(sql)
+
+	LogError(err)
+
+	return err
+}
+
+//油水井id
+func (s *TriggerService) Getysj_ID(searchstring string) int64 {
+	var err error
+	var total int64
+	sql := `select count(*) from channels_dg_oilwell  where id = ` + searchstring
+	resultsSlice, err := s.DBE.Query(sql)
+
+	LogError(err)
+	if len(resultsSlice) > 0 {
+		results := resultsSlice[0]
+		for _, value := range results {
+			total, err = strconv.ParseInt(string(value), 10, 64)
+			LogError(err)
+			break
+		}
+	}
+	return total
+}
+
+//油水井分解类型
+func (s *TriggerService) Getoilwell_type() []Channels_dg_oilwell {
+	sql := `select * from channels_dg_oilwell `
+	fmt.Println(sql)
+	list := make([]Channels_dg_oilwell, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//查询跟新开锁请求
+func (s *TriggerService) UpdateEntitybyId(id int, isSend float64, CreateOn int64) (err error) {
+	// isSends := strconv.FormatFloat(isSend, 'E', -1, 64)
+	difference := time.Now().Unix() - CreateOn
+	SendErrorMsg := strconv.FormatInt(difference, 10)
+	str_time := time.Now().Format("2006-01-02 15:04:05")
+	_, err = s.DBE.Exec(`update record_History set IsSend=  2,SendErrorMsg = ` + SendErrorMsg + `, ModifiedOn = "` + str_time + `"  where 
+		id=` + strconv.Itoa(id) + ``)
+	LogError(err)
+	if err != nil {
+		return err
+	}
+	return
+}
+
+//查询10秒内开锁请求
+func (s *TriggerService) GetHistoryByChannelCode(searchstring string, delay int) []Record_History {
+	//延迟
+	str_time := time.Unix(time.Now().Unix()-int64(delay), 0).Format("2006-01-02 15:04:05")
+	sql := `select * from record_history WHERE ChannelCode =  ` + searchstring + ` and CreateOn > '` + str_time + `'`
+	fmt.Println(sql)
+	list := make([]Record_History, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//名称查询 存储到redis
+func (s *TriggerService) GetEntitiesTRredis(searchstring string) []Channels {
+	sql := `select Title from channels  where Serial = ` + searchstring
+	fmt.Println(sql)
+	list := make([]Channels, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//经纬度查询
+func (s *TriggerService) GetEntitiesWithSearchlatlug(searchstring string) []Channelslat {
+	sql := `select Id,Unit,UnitSign from channels where Serial = ` + searchstring
+	fmt.Println(sql)
+	list := make([]Channelslat, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//库存数量计算
+func (s *TriggerService) Getmaterialckdetail(searchstring string) []MaterialCKDetail {
+	sql := `select  CKCount from materialckdetail where ` + searchstring
+	fmt.Println(sql)
+	list := make([]MaterialCKDetail, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//设备有效期
+func (s *TriggerService) GetEntitiesWithSearchdevice(searchstring string) []Triggers {
+	sql := `select a.Id, a.AccCode,a.Category,a.AId,d.SurplusCount,a.RemindTime,a.ValidityDate,a.ProductDate, a.FeedbackTime,b.SPara3,a.Switchalarm from Triggers  a  
+	inner join accountinfo d on d.Id=2925
+	inner join Actions b on a.AId=b.Id where a.Category = 3 ` + searchstring
+	fmt.Println(sql)
+	list := make([]Triggers, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+//添加历史记录
+func (s *TriggerService) GetINSERTchannels_dg_oilwell(id, code, name string, welltype int64, orgCode string) error {
+	sql := "insert into channels_dg_oilwell(Id,WellCode,Well_Name,WellType,OrgCode,CreateOn) values(" + id + ",'" + code + "','" + name + "','" + strconv.FormatInt(welltype, 10) + "','" + orgCode + "',NOW())"
+
+	_, err := s.DBE.Exec(sql)
+	return err
+}
+
+//添加历史记录
+func (s *TriggerService) GetINSERTHIStr(SendTo, ChannelName, Message, ChannelCode, ProjectSourse, ProjectAccount, ProjectAccountName, ActionType string, AlarmOn, IsSend, EventFiled, Value, Remark string) (id int64, err error) {
+	sql := "insert into trigger_history(Id,CreateUserId,CreateBy,CreateOn,ChannelName,ChannelCode,ProjectSourse,ProjectAccount,ProjectAccountName,SendTo,Message,ActionType,IsRead,AlarmOn,IsSend,EventFiled,Value,Remark) values(Null,138,'seed平台',NOW(),'" + ChannelName + "','" + ChannelCode + "','" + ProjectSourse + "','" + ProjectAccount + "','" + ProjectAccountName + "','" + SendTo + "','" + Message + "','" + ActionType + "',0,'" + AlarmOn + "','" + IsSend + "','" + EventFiled + "','" + Value + "','" + Remark + "')"
+	// sql := "insert into trigger_history(Id,SendTo,Message,ActionType,IsRead,AlarmOn,IsSend) values(Null,'" + SendTo + "','" + Message + "','" + ActionType + "',0,NOW(),'" + IsSend + "')"
+	fmt.Println("----sql----", sql)
+	lastid, err := s.DBE.Exec(sql)
+	ids, _ := lastid.LastInsertId()
+	return ids, err
+}
+
+//库存为0删除查询
+func (s *TriggerService) DeletetrforId(searchstring string) []Triggers {
+	sql := `delete from Triggers where Id =` + searchstring
+	fmt.Println(sql)
+	list := make([]Triggers, 0)
+	s.DBE.Sql(sql).Find(&list)
+	return list
+}
+
+func (s *TriggerService) UpdateEntityBytbl(tablename string, id interface{}, entity interface{}, cols []string) (err error) {
+	session := s.DBE.NewSession()
+	defer session.Close()
+	err = session.Begin()
+	LogError(err)
+	if err != nil {
+		session.Rollback()
+		return err
+	}
+	_, err = s.DBE.Table(tablename).Id(id).Cols(cols...).Update(entity) //执行更新
+	LogError(err)
+	if err != nil {
+		session.Rollback()
+		return err
+	}
+	err = session.Commit()
+	if err != nil {
+		return err
+	}
+	return
+}
+
+func (s *TriggerService) GetPswById(userid int) (userinfo UserInfo) {
+	sql := `select a.UserName,a.UserPassword,a.PublicKey from Base_User a where a.Id=` + utils.ToStr(userid)
+	s.DBE.Sql(sql).Get(&userinfo)
+	return userinfo
+}
+
+//取报警记录中两小时内的最后一条数据的报警类型
+func (s *TriggerService) GetTriggerTypelastbytime(systime time.Time, minute, triggerid int) int {
+	var triggertype Id_Int
+	sql := `select TriggerType Id from Trigger_History a where
+		a.CreateOn>DATE_ADD('%v', INTERVAL %v MINUTE)
+		and a.TriggerId =%v
+		order by a.CreateOn desc limit 1`
+	sql = fmt.Sprintf(sql, utils.TimeFormat(systime, "2006-01-02 15:04:05"), minute, triggerid)
+	s.DBE.Sql(sql).Get(&triggertype)
+	return triggertype.Id
+}
+
+//取报警记录中最后一条数据的报警类型
+func (s *TriggerService) GetTriggerTypelast(triggerid string, createtime time.Time) int {
+	var triggertype Id_Int
+	sql := `select TriggerType Id from Trigger_History a where
+		a.TriggerId =` + triggerid + ` and CreateOn>'` + createtime.Format("2006-01-02 15:04:05") + `'
+		order by a.CreateOn desc limit 1`
+	s.DBE.Sql(sql).Get(&triggertype)
+	return triggertype.Id
+}

+ 32 - 0
smartdevice_mqtt/conf/app.conf

@@ -0,0 +1,32 @@
+[receive]
+description="从mqtt接收数据"
+
+[receive.mqtt]
+clientid = "rtu01"
+username = "sg1"
+password = "dashoopwd"
+serveraddr = "tcp://39.98.34.197:1883"
+topic = "sg/update/#"
+
+
+
+[send]
+description="数据转存到nsq"
+delay = 50
+
+[send.controller]
+actionUrl = "http://39.98.34.197:12001/api/remote_control/rtuudate"
+Nsqdtcpaddr = "39.98.34.197:9150"
+topic = "blackboxd2"
+
+[send.redis]
+addr = "39.98.34.197:26379"
+poolnum = 50
+
+[send.db]
+type = "mysql"
+addr = "rm-8vbk16zx2rbfu6jt6uo.mysql.zhangbei.rds.aliyuncs.com"
+db = "mcs_db"
+user = "mcs_user"
+password = "X6T1pa5o1AgO"
+ 

+ 213 - 0
smartdevice_mqtt/main.go

@@ -0,0 +1,213 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"os"
+	"os/signal"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"dashoo.cn/smartdevice_mqtt/business/trigger"
+
+	"github.com/go-xorm/xorm"
+
+	"dashoo.cn/utils"
+
+	"reflect"
+	"unsafe"
+
+	"dashoo.cn/labsop"
+	"dashoo.cn/utils/redis"
+
+	"dashoo.cn/smartdevice_mqtt/service"
+	"github.com/eclipse/paho.mqtt.golang"
+	"github.com/nsqio/go-nsq"
+)
+
+var (
+	producer *nsq.Producer
+
+//	respChan chan *nsq.ProducerTransaction
+)
+var cv service.ChannelValue
+
+//发送返回参数
+type ReturnSMS struct {
+	Code    int    `json:"code"`
+	Message string `json:"message"`
+}
+
+type TopicMessage struct {
+	Topic   string `json:"topic"`
+	Payload string `json:"payload"`
+}
+
+// 收到订阅的mqtt消息处理
+func mqttMessageHandler(client mqtt.Client, msg mqtt.Message) {
+
+	// fmt.Printf("TOPIC: %s\n", msg.Topic())
+	// fmt.Printf("MSG: % X\n", msg.Payload())
+	SeedToControl(msg)
+	// datapoint, errv1 := service.Writedata(tagmac, boxmac, fmt.Sprintf("% X", command), cv.GpsBaseStation, wdvalue, sd, cv.Dy, cv.T2bxh, cv.Bbdy, 0, cjtimet, cstimet, bbtimet, bbnum, cv.Mxinh, cv.Sxuliehao, isreplace, cv.InfraredBarrier, o2, cv.Wd, co2, 0, cv.Lng, cv.Lat, cachevalue.DeviceState, cv.Wdtype, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
+
+	// producer.Publish(service.Config.Send.Controller.Topic, msg.Payload())
+}
+
+func SeedToControl(msg mqtt.Message) {
+	var tms TopicMessage
+	fmt.Printf("TOPIC: %s\n", msg.Topic())
+	fmt.Printf("MSG: % X\n", msg.Payload())
+	code := strings.Split(msg.Topic(), "/")
+	tms.Topic = code[2]
+	if len(msg.Payload()) == 1 {
+		v2, _ := strconv.ParseFloat(BytesToString(msg.Payload()), 64)
+		cv.Doorlock = v2
+		cv.Time = time.Now()
+		WriteRedisAndIfluxdb(tms.Topic, cv, msg.Payload(), utils.DBE)
+	}
+
+	// tms.Payload = fmt.Sprintf("%X", msg.Payload())
+	// ActionUrl := service.Config.Send.Controller.ActionUrl
+	// body := ApiPost(ActionUrl, "POST", tms)
+	// fmt.Println("", body.Code, body.Message)
+	return
+}
+
+//网络信号 CSQ
+func CSQ(command []byte) float64 {
+	return float64(command[0])
+}
+
+func BytesToString(b []byte) string {
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+	sh := reflect.StringHeader{bh.Data, bh.Len}
+	return *(*string)(unsafe.Pointer(&sh))
+}
+
+//写入数据库 redis
+func WriteRedisAndIfluxdb(tagmac string, cv service.ChannelValue, value []byte, dbe *xorm.Engine) {
+
+	svc := trigger.GetTriggerService(dbe)
+	list := svc.GetHistoryByChannelCode(tagmac, service.Config.Send.Delay)
+	for i := 0; i < len(list); i++ {
+		svc.UpdateEntitybyId(list[i].Id, cv.Doorlock, list[i].CreateOn.Unix())
+	}
+
+	cachevalue, _ := labsop.GetBoxLastdata(tagmac)
+	datapoint, errv1 := service.Writedata(tagmac, cv.BBMac, fmt.Sprintf("% X", value), cv.GpsBaseStation, cv.Temperature, cv.Humidity, cv.Voltage, cv.T2bxh, cv.BBVoltage, 0, cv.Time, cv.BBCSTime, cv.BBCSTime, cv.BBNo, cv.SimSignal, cv.SensorNo, 0, cv.InfraredBarrier, 0, cv.Temperature, 0, 0, cv.Lng, cv.Lat, 0, 1, cv.ElectricalPower, cv.ElectricalSupply, cv.NTCR, cv.ThreeACVA, cv.ThreeACAA, cv.ThreeACVB, cv.ThreeACAB, cv.ThreeACVC, cv.ThreeACAC, cv.ThreeACW, cv.Doorlock)
+
+	cjutc := cv.Time.Unix()
+	if errv1 == nil && cjutc >= cachevalue.Time.Unix() {
+		//写入成功后才写入缓存
+		fmt.Println("设备号", tagmac)
+		labsop.UpdateBoxLastdata(tagmac, datapoint)
+	}
+}
+
+func ApiPost(strUrl, method string, postDict interface{}) (listPtr ReturnSMS) {
+
+	httpClient := &http.Client{
+		//Transport:nil,
+		//CheckRedirect: nil,
+		Timeout: 6 * time.Second,
+	}
+
+	var httpReq *http.Request
+
+	b, _ := json.Marshal(postDict)
+	postBytesReader := bytes.NewReader(b)
+	httpReq, _ = http.NewRequest(method, strUrl, postBytesReader)
+	httpReq.Header.Add("Content-Type", "application/json")
+	response, _ := httpClient.Do(httpReq)
+	if response != nil {
+		body, _ := ioutil.ReadAll(response.Body)
+		json.Unmarshal(body, &listPtr)
+	}
+	return
+}
+
+func main() {
+	service.Config = service.ReadAppConfig("conf/app.conf")
+	//初始化缓存区
+	redis.InitRedis(service.Config.Send.Redis.Poolnum, service.Config.Send.Redis.Addr)
+	utils.InitDb(service.Config.Send.Db.Type, service.Config.Send.Db.Addr, service.Config.Send.Db.Db, service.Config.Send.Db.User, service.Config.Send.Db.Password)
+
+	// Set up channel on which to send signal notifications.
+	termChan := make(chan os.Signal, 1)
+	signal.Notify(termChan, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
+
+	nsqCfg := nsq.NewConfig()
+	var err error
+	service.Producernsq, err = nsq.NewProducer(service.Config.Send.Controller.Nsqdtcpaddr, nsqCfg)
+	if err != nil {
+		log.Fatalf("failed creating producer %s", err)
+	}
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker(service.Config.Receive.Mqtt.Serveraddr)
+	opts.SetClientID(service.Config.Receive.Mqtt.Clientid)
+	opts.SetUsername(service.Config.Receive.Mqtt.Username)
+	opts.SetPassword(service.Config.Receive.Mqtt.Password)
+	opts.SetDefaultPublishHandler(mqttMessageHandler)
+	opts.SetKeepAlive(60 * time.Second)
+	opts.SetCleanSession(false)
+	opts.SetPingTimeout(1 * time.Second)
+	mqttClient := mqtt.NewClient(opts)
+	if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
+		log.Fatal(token.Error())
+	}
+
+	if token := mqttClient.Subscribe(service.Config.Receive.Mqtt.Topic, 1, nil); token.Wait() && token.Error() != nil {
+		log.Fatal(token.Error())
+	}
+	fmt.Println("服务开启")
+
+	// ticker := time.NewTicker(time.Second * 30)
+	// go func() {
+	// 	for _ = range ticker.C {
+	// 		str_time := time.Now().Format("2006-01-02 15:04:05")
+	// 		// mqttClient.Publish("sg/cmd/63000105", 0, true, "SHELL:@DO1=?")
+	// 		// time.Sleep(1 * time.Second)
+	// 		// mqttClient.Publish("sg/cmd/64100011", 0, true, "SHELL:@DO1=?")
+	// 		// time.Sleep(1 * time.Second)
+	// 		// mqttClient.Publish("sg/cmd/64100012", 0, true, "SHELL:@DO1=?")
+	// 		// // mqttClient.Publish("sg/cmd/64100010", 1, true, "SHELL:@DO1=?")
+	// 		strUrl := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100012&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
+	// 		s := Apiget(strUrl)
+	// 		time.Sleep(1 * time.Second)
+	// 		strUrl1 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100013&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
+	// 		s1 := Apiget(strUrl1)
+	// 		time.Sleep(1 * time.Second)
+	// 		strUrl2 := "http://39.98.34.197:12001/api/remote_control/rtubuttonseedcmd?instruction=DO1=1&code=64100014&ygnick=all&ygopenid=oZ2X-weExJP7jTGlhknsxAnf12r4"
+	// 		s2 := Apiget(strUrl2)
+	// 		fmt.Println("----发送----", str_time, string(s), string(s1), string(s2))
+	// 	}
+	// }()
+	time.Sleep(6 * time.Second)
+
+	//	if token := seed/sg01.Unsubscribe("seed/sg01"); token.Wait() && token.Error() != nil {
+	//		fmt.Println(token.Error())
+	//		os.Exit(1)
+	//	}
+	// Wait for receiving a signal.
+	<-termChan
+	mqttClient.Disconnect(250)
+
+}
+
+func Apiget(str string) (body []byte) {
+	str = strings.Replace(str, " ", "%20", -1)
+	response, _ := http.Get(str)
+	if response != nil {
+		defer response.Body.Close()
+		body, _ = ioutil.ReadAll(response.Body)
+	}
+	//json.Unmarshal(body, &devices)
+	return
+}

+ 37 - 0
smartdevice_mqtt/pub_test.go

@@ -0,0 +1,37 @@
+package main
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/eclipse/paho.mqtt.golang"
+)
+
+func TestPub(t *testing.T) {
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker("tcp://39.98.34.197:1883")
+	opts.SetClientID("test")
+	opts.SetUsername("sg1")
+	opts.SetPassword("dashoopwd")
+	// opts.SetDefaultPublishHandler(messageHandler)
+	opts.SetKeepAlive(2 * time.Second)
+	opts.SetPingTimeout(1 * time.Second)
+
+	c := mqtt.NewClient(opts)
+	if token := c.Connect(); token.Wait() && token.Error() != nil {
+		panic(token.Error())
+	}
+
+	for i := 0; i < 3; i++ {
+		text := fmt.Sprintf("this is msg #%d!", i)
+		token := c.Publish("go-mqtt/sample", 0, false, text)
+		token.Wait()
+	}
+
+	time.Sleep(6 * time.Second)
+
+	c.Disconnect(250)
+
+	time.Sleep(1 * time.Second)
+}

+ 60 - 0
smartdevice_mqtt/service/conf.go

@@ -0,0 +1,60 @@
+package service
+
+import (
+	"io/ioutil"
+	"os"
+
+	"github.com/naoina/toml"
+)
+
+type AppConfig struct {
+	Receive struct {
+		Description string
+		Mqtt        struct {
+			Clientid   string
+			Username   string
+			Password   string
+			Serveraddr string
+			Topic      string
+		}
+	}
+
+	Send struct {
+		Description string
+		Delay       int
+		Redis       struct {
+			Addr    string
+			Poolnum int
+		}
+		Db struct {
+			Type     string
+			Addr     string
+			Db       string
+			User     string
+			Password string
+		}
+		Controller struct {
+			ActionUrl   string
+			Nsqdtcpaddr string
+			Topic       string
+		}
+	}
+}
+
+var Config AppConfig
+
+func ReadAppConfig(path string) (config AppConfig) {
+	f, err := os.Open(path)
+	if err != nil {
+		panic(err)
+	}
+	defer f.Close()
+	buf, err := ioutil.ReadAll(f)
+	if err != nil {
+		panic(err)
+	}
+	if err := toml.Unmarshal(buf, &config); err != nil {
+		panic(err)
+	}
+	return
+}

+ 87 - 0
smartdevice_mqtt/service/writedata.go

@@ -0,0 +1,87 @@
+package service
+
+import (
+	"encoding/json"
+	"time"
+
+	wdcc "dashoo.cn/labsop"
+	"github.com/nsqio/go-nsq"
+)
+
+//tag说明f:fields,t:tag,-:不赋值
+type ChannelValue struct {
+	Time             time.Time
+	TimeUnix         int64
+	Sequence         int64
+	Mxinh            int64
+	T2bxh            float64
+	Lat              float64   //纬度
+	Lng              float64   //经度
+	Speed            float64   //速度
+	Temperature      float64   //温度
+	Humidity         float64   //湿度
+	Voltage          float64   //电压
+	RSSI             float64   //信号 Signal (sensor到blackbox)
+	CSTime           time.Time //传输时间
+	SensorNo         int64     //Sensor序列号 Zbackup4
+	BBVoltage        float64   //bb电压
+	BBCSTime         time.Time //BB传输time  Zbackup2
+	BBMac            string    //bbmac
+	BBMacTag         string    //bbmactag
+	BBNo             int64     //bb序列号 SensorNo
+	SimSignal        int64     //移动信号  Zbackup3
+	SimIP            string    //sim900 IP Zbackup7
+	ServiceTime      time.Time //服务器时间  Zbackup5
+	RequestData      string    //请求原数据  Zbackup1
+	O2               float64   //氧气浓度  Zbackup8
+	SO2              float64   //二氧化硫  Zbackup8
+	Isreplace        int64     //是否替换(0未替换,1已替换)  Zbackup9
+	RealTemperature  float64   //原温度  Zbackup10
+	Co2              float64   //Co2
+	H2O              float64   //水质
+	WindSpeed        float64   //风速
+	Pressure         float64   //气压
+	Ots              float64   //压力大港
+	Displacement     float64   //位移
+	ClO2             float64   //二氧化氯
+	C2H4             float64   //乙烯
+	C2H2             float64   //乙炔
+	Cl2              float64   //氯气
+	O3               float64   //臭氧
+	TVOC             float64   //TVOC
+	Waterflow        float64   //waterflow
+	DeviceState      int64     // 监控设备状态
+	ElectricalPower  int64     // 电器功率
+	ElectricalSupply int64     // 市电电压
+	NTCR             float64   // Sensor端NTC热敏电阻
+	ThreeACVA        float64   // A相电压
+	ThreeACVB        float64   // B相电压
+	ThreeACVC        float64   // C相电压
+	ThreeACAA        float64   // A相电流
+	ThreeACAB        float64   // B相电流
+	ThreeACAC        float64   // C相电流
+	ThreeACW         float64   // 三相有功总电能
+	InfraredBarrier  int64     // 红外阻挡物(0 无阻碍物 1有障碍物 other 其他)
+	GpsBaseStation   string    // gps基站信息
+	LiquidLevel      float64   // 液位
+	Doorlock         float64   // 门锁
+}
+
+var Producernsq *nsq.Producer
+
+type blackboxddata struct {
+	Code      string
+	Datapoint wdcc.DatapointLabSop
+}
+
+func Writedata(tagmac, bbmac, zbackup1, gpsbasestation string, v1, v2, v3, v4, v5, v6 float64, cjtime, cstime, bbtime time.Time, xulieh, mobilexh, sensornum, isreplace, infraredbarrier int64, o2, realtemperature, co2, waterflow, lng, lat float64, devicestate, wdtype int, electricalpower, electricalsupply int64, ntrc, threeacva, threeacaa, threeacvb, threeacab, threeacvc, threeacac, threeacw, doorlock float64) (wdcc.DatapointLabSop, error) {
+	var data wdcc.DatapointLabSop
+	data = wdcc.DatapointLabSop{Time: cjtime, Temperature: v1, Humidity: v2, Voltage: v3, RSSI: v4, CSTime: cstime, BBVoltage: v5, BBMac: bbmac, BBNo: xulieh, RequestData: zbackup1, BBCSTime: bbtime, SimSignal: 24, SensorNo: sensornum, ServiceTime: time.Now(), O2: o2, Isreplace: isreplace, RealTemperature: realtemperature, Co2: co2, Waterflow: waterflow, Lat: lat, Lng: lng, DeviceState: int64(devicestate), ElectricalPower: electricalpower, ElectricalSupply: electricalsupply, GpsBaseStation: gpsbasestation, NTCR: ntrc, ThreeACVA: threeacva, ThreeACVB: threeacvb, ThreeACVC: threeacvc, ThreeACAA: threeacaa, ThreeACAB: threeacab, ThreeACAC: threeacac, ThreeACW: threeacw, InfraredBarrier: infraredbarrier, Doorlock: doorlock}
+	bdata, err := json.Marshal(blackboxddata{tagmac, data})
+	if err == nil {
+		errnsq := Producernsq.Publish(Config.Send.Controller.Topic, bdata)
+		return data, errnsq
+		// return data, nil
+	}
+	return data, err
+}

BIN
smartdevice_mqtt/smartdevice_mqtt