package main import ( _ "Bitable/action_engine" "Bitable/box" "Bitable/file" "bytes" "encoding/json" "fmt" "os/exec" "strconv" "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) // AutoServer : Bat che do update engine tu dong var AutoServer = 1 // MqttCmsBi : Khoi tao doi tuong mqtt var MqttCmsBi mqtt.Client // CmsHostBi : host MQTT const CmsHostBi string = "tcp://broker.beetai.com:21883" // CmsAccessTokenBi : User const CmsAccessTokenBi = "beeetai" // CmsPassBi : Password const CmsPassBi = "5ige8TdfTEHoTgJz" const mqtt_disconnect_timeout uint = 5000 const mqtt_connect_timeout time.Duration = 5 var idBox = "" // CmsTopicIn : Server to Box var CmsTopicIn = "v1/devices/" + idBox + "/telemetry" // CmsTopicOut : Box to Server var CmsTopicOut = "v1/devices/" + idBox + "/request/+" // StatusSSH : Cho phep bat tat ssh tu xa var StatusSSH = 0 var SttNetwork = false // SttUpdateEngine : Trang thai update engine // var SttUpdateEngine = false // MqttStt : Conect mqtt server yes? no var MqttStt = false // SttUpdate: Quyet dinh dung gui message len server mqtt broker var SttUpdate = false var SttPub = true // Counter number reconnect publish var number_repub = 0 // Counter number reconnect subcriber var number_resub = 0 // Pwd : Password type Pwd struct { Password string `json:"password"` } type Ports struct { Port string `json:"port"` } var port Ports var STT_test bool //============================> Main <========================== func main() { // mqtt.DEBUG = log.New(os.Stderr, "DEBUG ", log.Ltime) // | log.Lshortfile) //box.GetDir() // Check thu muc goc fmt.Println("FUC......U") file.Check_path(box.Path_log_luncher) file.Check_path(box.Path_log_engine) file.Write_log("<------------------------------------------------------->", box.Path_log_luncher) file.Write_log("<-------------------- START MAIN ----------------------->", box.Path_log_luncher) file.Write_log("<------------------------------------------------------->", box.Path_log_luncher) box.Stop_check_engine() box.Stop_engine() box.Stop_Server_Engine() go MqttServer() // Check connect tới server MQTT 5p 1 lần // Loop: for { if AutoServer == 1 { // Đã được khai báo trên trang quan lý chưa if box.Check_dir(box.Path_engine_config) == 0 { // Chưa được khai báo if box.SetUp() == false { file.Println("Box not register on CMS") time.Sleep(7 * time.Second) // Waitting 7 second for next request } else { if SttNetwork == true { // KHoi tao connect MQTT cms.beetai.com if MqttStt == false { // if CheckConnectionMQTT() == true { // Server mqtt running ? CreateTopic() MqttBegin() //MqttCmsBi.Subscribe(CmsTopicOut, 1, MqttMessageHandler) MqttStt = true // } } if MqttStt == true && SttUpdate == false && SttPub == true { PublishData() //go Ping_broker_sub() // if STT_test == false { // // time.Sleep(10) // Ping_broker_sub() // STT_test = true // } } } else { MqttStt = false } if SttUpdate == false { box.RunCheck() // Check Check_engine running ??? } //file.Println("Publish data done") } } else { // Đã đươc khai báo trên cms if SttNetwork == true { // KHoi tao connect MQTT cms.beetai.com if MqttStt == false { // Chưa connect đến MQTT or mất connect mqtt //if CheckConnectionMQTT() == true { // Server mqtt running ? CreateTopic() MqttBegin() //MqttCmsBi.Subscribe(CmsTopicOut, 1, MqttMessageHandler) MqttStt = true //} } if MqttStt == true && SttUpdate == false && SttPub == true { // Check khởi tạo done mqtt and no update and cho phép publish message box.RunCheck() PublishData() //go Ping_broker_sub() // if STT_test == false { // // time.Sleep(10) // Ping_broker_sub() // STT_test = true // } } } else { MqttStt = false } if SttUpdate == false { box.RunCheck() // Check Check_engine running ??? } //file.Println("Publish data done") } box.Sleep_ms(3000) } } } // CreateTopic : Function //***********************************************// //------------------ Function --------------------- var count = 0 func Ping_broker_sub() { //for { fmt.Println("Ping thu den sub broker") count = count + 1 if count > 10000 { count = 0 } payload := `{"method":"sub_begin","status":` + strconv.Itoa(count) + `}` //playload_json := json.NewDecoder(strings.NewReader(payload)) Token1 := MqttCmsBi.Publish(CmsTopicOut, 0, false, payload) if Token1.Wait() && Token1.Error() != nil { fmt.Printf("Error Publish message CMS BI: %v\n", Token1.Error()) file.Write_log("Error Publish Ping_broker_sub()", box.Path_log_luncher) } else { fmt.Println("Ping_broker_sub() OKIE ") file.Write_log("Ping_broker_sub() OKIE", box.Path_log_luncher) //file.Write _log("Send message CMS BI", box.Path_log_luncher) } // time.Sleep(2 * time.Hour) //} } // CreateTopic : Function func CreateTopic() { //idBox = box.Get_id() idBox = box.Get_idBox() CmsTopicIn = "v1/devices/" + idBox + "/telemetry" CmsTopicOut = "v1/devices/" + idBox + "/request/+" file.Println("idBox: " + idBox) file.Println("TOPIC IN :" + CmsTopicIn) file.Println("TOPIC OUT :" + CmsTopicOut) } // PublishData : Function func PublishData() { //var payload string = "{" + "\"ip_private\":" + "\"" + ip + "\"" + "," + "\"box_id\":" + "\"" + id_cam + "\"" + "," + "\"engine_status\":" + "\"" + sts_engine + "\"" + "," + "\"temperature\":" + "\"" + temp + "\"" + "," + "\"ip_public\":" + "\"" + ip_public + "\"" + "," + "\"mac_address\":" + "\"" + ether + "\"" + "," + "\"boot_storage\":" + "\"" + sum_boot_stg + "\"" + "," + "\"boot_storage_usage\":" + "\"" + boot_stg + "\"" + "," + "\"memory\":" + "\"" + sum_mem + "\"" + "," + "\"memory_usage\":" + "\"" + mem + "\"" + "," + "\"cpu_usage\":" + "\"" + FloatToString(avg) + "\"" + "," + "\"storage_usage\":" + "\"" + box_stg + "\"" + "," + "\"storage\":" + "\"" + sum_storage + "\"" + "}" var payload = box.Message_pub() //file.Println(payload) //file.Printf("Payload = %v\n\n", payload) Token1 := MqttCmsBi.Publish(CmsTopicIn, 1, false, payload) if Token1.Wait() && Token1.Error() != nil { //fmt.Printf("Error Publish message CMS BI: %v\n", Token1.Error()) file.Write_log("Error Publish message CMS BI", box.Path_log_luncher) } else { file.Println("Send message CMS BI") //file.Write_log("Send message CMS BI", box.Path_log_luncher) } } // MqttBegin : Khoi tao MQTT func MqttBegin() { OptsCmsBI := mqtt.NewClientOptions() OptsCmsBI.AddBroker(CmsHostBi) OptsCmsBI.SetUsername(CmsAccessTokenBi) OptsCmsBI.SetPassword(CmsPassBi) OptsCmsBI.SetCleanSession(true) OptsCmsBI.SetConnectionLostHandler(MQTTLostConnectHandler) OptsCmsBI.SetOnConnectHandler(MQTTOnConnectHandler) MqttCmsBi = mqtt.NewClient(OptsCmsBI) if Token1 := MqttCmsBi.Connect(); Token1.Wait() && Token1.Error() == nil { fmt.Println("MQTT CMS Beetsoft Connected\n") file.Write_log("MQTT CMS BeetsoftConnected\n", box.Path_log_luncher) MqttCmsBi.Subscribe(CmsTopicOut, 0, MqttMessageHandler) } else { file.Write_log("MQTT CMS Beetsoft cant not Connected\n", box.Path_log_luncher) MqttStt = false fmt.Println("MQTT CMS Beetsoft cant not Connect\n") // fmt.Printf("Loi CMS Beetsoft : %v \n", Token1.Error()) file.Println("-------------------\n") } } /************************************************************************/ func mosquitto_LostConnect_Handler(c mqtt.Client, err error) { fmt.Printf("Mosquitto LostConnect, reason: %v\n", err) } func mosquitto_OnConnect_Handler(c mqtt.Client) { fmt.Println("OnConnect Mosquitto") c.Subscribe(CmsTopicOut, 0, DomoticzSubscribe_CallBack) } func DomoticzSubscribe_CallBack(c mqtt.Client, message mqtt.Message) { fmt.Printf("TOPIC: %s\n", message.Topic()) fmt.Printf("MSG:\n %s\n", message.Payload()) } // end function /************************************************************************/ func MqttBegin_test() { OptsCmsBI := mqtt.NewClientOptions() OptsCmsBI.AddBroker(CmsHostBi) OptsCmsBI.SetUsername(CmsAccessTokenBi) OptsCmsBI.SetPassword(CmsPassBi) OptsCmsBI.SetCleanSession(true) OptsCmsBI.SetConnectionLostHandler(mosquitto_LostConnect_Handler) OptsCmsBI.SetOnConnectHandler(mosquitto_OnConnect_Handler) if MqttCmsBi != nil && MqttCmsBi.IsConnected() { MqttCmsBi.Disconnect(mqtt_disconnect_timeout) } MqttCmsBi = mqtt.NewClient(OptsCmsBI) Token1 := MqttCmsBi.Connect().WaitTimeout(mqtt_connect_timeout * time.Second) if Token1 == true { file.Println("MQTT CMS Beetsoft Connected\n") file.Write_log("MQTT CMS BeetsoftConnected\n", box.Path_log_luncher) MqttCmsBi.Subscribe(CmsTopicOut, 0, MqttMessageHandler) } else { file.Write_log("MQTT CMS Beetsoft cant not Connected\n", box.Path_log_luncher) MqttStt = false file.Println("MQTT CMS Beetsoft cant not Connected\n") // fmt.Printf("Loi CMS Beetsoft : %v \n", Token1.Error()) file.Println("-------------------\n") } } // Check lost connect mqtt server - can't publish msg func MQTTLostConnectHandler(c mqtt.Client, err error) { //c.Disconnect(10) //MqttStt = false number_repub = number_repub + 1 file.Write_log("MQTT CMS Beetsoft Lost Connect\n", box.Path_log_luncher) file.Write_log("Number reconnect publish msg: "+strconv.Itoa(number_repub)+"\n", box.Path_log_luncher) fmt.Printf("Mosquitto LostConnect, reason: %v\n", err) //file.Println(err) } // Check lost connect mqtt server - can't subcriber msg from cms func MQTTOnConnectHandler(client mqtt.Client) { fmt.Println("MQTTOnConnectHandler()") number_resub = number_resub + 1 file.Write_log("Lostconnect chanel subcriber: MQTT_OnConnectHandler\n", box.Path_log_luncher) file.Write_log("Number reconnect subcriber msg: "+strconv.Itoa(number_resub)+"\n", box.Path_log_luncher) //client.Unsubscribe(CmsTopicOut) //time.Sleep(10) //client.Subscribe(CmsTopicOut, 0, MqttMessageHandler) if token := client.Subscribe(CmsTopicOut, 0, MqttMessageHandler); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) file.Write_log("Subscriber Error MQTT\n", box.Path_log_luncher) } else { fmt.Println("Subcriber is MQTTOnConnectHandler () OKIE ") file.Write_log("Subcriber is MQTTOnConnectHandler () OKIE\n ", box.Path_log_luncher) } STT_test = false // file.Write_log("Recall function Subscribe MQTT\n", box.Path_log_luncher) } func MqttServer() { // fmt.Println("Check Network") for { if CheckConnectionMQTT() == true { // fmt.Println("CheckConnectionMQTT == true") SttNetwork = true } else { MqttStt = false SttNetwork = false file.Write_log("============>>Network not Foud ! <==========\n ", box.Path_log_luncher) // fmt.Println("============>>Network not Foud ! <==========\n") } time.Sleep(1 * time.Minute) } } // CheckConnectionMQTT: Check connection to MQTT server Beetsoft func CheckConnectionMQTT() bool { // fmt.Println("-----------------------------") var SttMQTTServer = false // Biến trang thái kết nối MQTT với server out, err := exec.Command("ping", "broker.beetai.com", "-c 5", "-i 0.2", "-w 5").Output() if err != nil { // fmt.Println("Loi ham CheckConnectionMQTT() ") fmt.Println(err) file.Write_log("Ping broker.beetai.com timeout ---> recall Mqttbegin() \n", box.Path_log_luncher) SttMQTTServer = false } else { // fmt.Println("out of CheckConnectionMQTT() ") // fmt.Println(out) if strings.Contains(string(out), "Destination Host Unreachable") { file.Println("Destination Host Unreachable MQTT") file.Write_log("Destination Host Unreachable MQTT\n", box.Path_log_luncher) SttMQTTServer = false } else { file.Println("Ping broker.beetai.com OKIE...") SttMQTTServer = true } } // fmt.Println(SttMQTTServer) // fmt.Println("-----------------------------") return SttMQTTServer } /************************************************************************/ // MqttMessageHandler : mqtt callback server beetswoft func MqttMessageHandler(MqttBI mqtt.Client, message mqtt.Message) { fmt.Println("=================== MqttMessageHandler ====================") fmt.Printf("Message %s\n", message) fmt.Printf("TOPIC: %s\n", message.Topic()) fmt.Printf("MSG:\n %s\n", message.Payload()) fmt.Println("=========================== + = + ==========================") dec := json.NewDecoder(bytes.NewReader(message.Payload())) var list map[string]interface{} if err := dec.Decode(&list); err != nil { //file.Printf("Error:%v\n", err) file.Write_log("Message:Loi form message\n", box.Path_log_luncher) } else { //***********************************************// if list["method"] == "change_passwd" { // {"method" : "change_passwd","params": {"password": "123456@"}} file.Println("Changing password \n") s, err := json.Marshal(list["params"]) if err != nil { //file.Println(err) } var pwd Pwd json.Unmarshal([]byte(string(s)), &pwd) topic := strings.Replace(message.Topic(), "request", "response", 1) //pass := pwd.Password // fmt.Print(topic, "\n\n") // fmt.Printf("New password:%v\n", pass) //box.Set_passwd(pass) msg := `{"method":"change_passwd","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) } else if list["method"] == "open_ssh" { file.Println("Open port ssh\n") topic := strings.Replace(message.Topic(), "request", "response", 1) p, err := json.Marshal(list["params"]) if err != nil { //file.Println(err) } var pVal = port.Port if StatusSSH == 0 { json.Unmarshal([]byte(string(p)), &port) pVal = port.Port file.Println(pVal) file.Println("Bat ssh") file.Println("Out ssh") msg := `{"method":"open_ssh","status": ` + strconv.Itoa(1) + `,"params": {"port": "` + pVal + `"}}` file.Write_log("Message response cms:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) box.Ssh_open(pVal) file.Println("chay lenh xong") } StatusSSH = 1 // file.Println("Out ssh") // msg := `{"method":"open_ssh","status": ` + strconv.Itoa(1) + `,"params": {"port": "` + pVal + `"}}` // file.Write_log("Message response:"+msg+" \n", box.Path_log_luncher) // CmsResponse(MqttBI, topic, msg) file.Println("Open Done ssh\n") } else if list["method"] == "close_ssh" { file.Println("Close port ssh\n") topic := strings.Replace(message.Topic(), "request", "response", 1) if StatusSSH == 1 { box.Ssh_close() StatusSSH = 0 } msg := `{"method":"close_ssh","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Close Done ssh\n") } else if list["method"] == "update_engine" { file.Println("Update engine\n") UpdateEngine() topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"update_engine","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Update engine done\n") } else if list["method"] == "sub_begin" { // Test subcriber file.Write_log("======>> Subcriber is OKIE <========== \n", box.Path_log_luncher) } else if list["method"] == "update_model" { // {"method":"update_model","status": 1} file.Println("Update_model \n") fmt.Printf("%v--%T", list["id_box_engine"], list["id_box_engine"]) msg := `{"method":"update_model","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) if SttUpdate == false { UpdateModule((int)((list["id_box_engine"]).(float64))) topic := strings.Replace(message.Topic(), "request", "response", 1) // file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("UpdateModule done\n") } } else if list["method"] == "inactive_box" { file.Println("Inactive Box\n") SttPub = false topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"inactive_box","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Inactive Box done\n") } else if list["method"] == "active_box" { file.Println("Active Box\n") SttPub = true topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"active_box","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Active Box done\n") } else if list["method"] == "delete_box" { file.Println("Delete box\n") SttPub = false topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"delete_box","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Delete box done\n") } else if list["method"] == "getVersion" { topic := strings.Replace(message.Topic(), "request", "response", 1) //msg := `{"version":` + box.Get_version() + `}` msg := `{"method":"getVersion","params":` + box.Get_version() + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Get version done\n") } else if list["method"] == "reboot" { file.Write_log("Message:reboot\n", box.Path_log_luncher) //file.Println("=======================>> Reboot BOX <<=====================") box.Reboot_box() } else if list["method"] == "cmd" { topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"cmd","status": ` + strconv.Itoa(1) + `}` //file.Println(topic) file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) //CmsResponse(MqttBI, topic, msg) //file.Printf("%T_%v", list["params"], list["params"]) m, _ := list["params"].(string) var out string k := strings.Index(m, "nano") if k > -1 { if strings.Index(m, "Config.txt") > -1 { path := m[k+5:] file.Println(path) out = file.ReadFile(path) file.Println("0") } else { out = box.Cmd_Box(m) file.Println("1") } } else { out = box.Cmd_Box(m) file.Println("2") } //file.Println(out) // out := box.Cmd_Box(m) mg := `{"method":"cmd","status": "` + "\n" + out + "\n" + `"}` file.Println(mg) CmsResponse(MqttBI, topic, mg) } else if list["method"] == "upgrade_engine" { file.Println(`list["method"] == "upgrade_engine"`) s, err := json.Marshal(list["params"]) if err != nil { //fmt.Println(err) } topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"upgrade_engine","status":` + strconv.Itoa(1) + `}` //file.Println(topic) file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) params := string(s) UpgradeEngine(params) } else { //fmt.Println(list) } } } func UpdateModule(id_engine int) { file.Write_log("-------- START UPDATE MODULE ----------", box.Path_log_luncher) SttUpdate = true // box.Stop_check_engine() // box.Stop_Server_Engine() box.RequestUpdate_Module(box.Url_module, id_engine) SttUpdate = false file.Write_log("---------------------------------------", box.Path_log_luncher) } func UpdateEngine() { SttUpdate = true box.Stop_check_engine() box.Stop_engine() box.RequestUpdate(box.Url) SttUpdate = false } // CmsResponse : Phan hoi message tu box to Server func CmsResponse(c mqtt.Client, topic string, msg string) { c.Publish(topic, 0, false, msg) file.Println("Publish message done") } // UpgradeEngine : Nang cap engine func UpgradeEngine(params string) { //file.Println("0") box.Check_path(box.Path_upgrade) //file.Println("1") file.Create_file(box.Path_engine_update) file.Write_file(params, box.Path_engine_update) //file.Println("2") SttUpdate = true box.Stop_check_engine() //file.Println("3") box.Stop_engine() //file.Println("4") }