package main import ( _ "beetai_lattepanda/action_engine" "beetai_lattepanda/box" "beetai_lattepanda/file" "bytes" "encoding/json" "fmt" "os/exec" "strconv" "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) // AutoServer : Bat che do update engine tu dong var AutoServer = 1 // MqttCmsBi : Khoi tao doi tuong mqtt var MqttCmsBi mqtt.Client // CmsHostBi : host MQTT const CmsHostBi string = "tcp://broker.beetai.com:21883" // CmsAccessTokenBi : User const CmsAccessTokenBi = "beeetai" // CmsPassBi : Password const CmsPassBi = "5ige8TdfTEHoTgJz" var idBox = "" // CmsTopicIn : Server to Box var CmsTopicIn = "v1/devices/" + idBox + "/telemetry" // CmsTopicOut : Box to Server var CmsTopicOut = "v1/devices/" + idBox + "/request/+" // StatusSSH : Cho phep bat tat ssh tu xa var StatusSSH = 0 // SttUpdateEngine : Trang thai update engine // var SttUpdateEngine = false // MqttStt : Conect mqtt server yes? no var MqttStt = false // SttUpdate: Quyet dinh dung gui message len server mqtt broker var SttUpdate = false var SttPub = true // Counter number reconnect publish var number_repub = 0 // Counter number reconnect subcriber var number_resub = 0 // Pwd : Password type Pwd struct { Password string `json:"password"` } type Ports struct { Port string `json:"port"` } var port Ports //============================> Main <========================== func main() { //box.GetDir() // Check thu muc goc file.Check_path(box.Path_log_luncher) file.Check_path(box.Path_log_engine) fmt.Println("----------------------START MAIN -------------------------") box.Stop_check_engine() box.Stop_engine() go MqttServer() // Check connect tới server MQTT 5p 1 lần // Loop: for { if AutoServer == 1 { // Đã được khai báo trên trang quan lý chưa if box.Check_dir(box.Path_engine_config) == 0 { // Chưa được khai báo if box.SetUp() == false { fmt.Println("Box not register on CMS") time.Sleep(7 * time.Second) // Waitting 7 second for next request } else { // KHoi tao connect MQTT cms.beetai.com if MqttStt == false { if CheckConnectionMQTT() == true { // Server mqtt running ? CreateTopic() MqttBegin() // MqttCmsBi.Subscribe(CmsTopicOut, 1, MqttMessageHandler) MqttStt = true } } if MqttStt == true && SttUpdate == false && SttPub == true { box.RunCheck() // Check Check_engine running ??? PublishData() } //file.Println("Publish data done") } } else { // Đã đươc khai báo trên cms // KHoi tao connect MQTT cms.beetai.com if MqttStt == false { // Chưa connect đến MQTT or mất connect mqtt if CheckConnectionMQTT() == true { // Server mqtt running ? CreateTopic() MqttBegin() // MqttCmsBi.Subscribe(CmsTopicOut, 1, MqttMessageHandler) MqttStt = true } } if MqttStt == true && SttUpdate == false && SttPub == true { // Check khởi tạo done mqtt and no update and cho phép publish message box.RunCheck() PublishData() } //file.Println("Publish data done") } box.Sleep_ms(3000) } } } // CreateTopic : Function //***********************************************// //------------------ Function --------------------- // CreateTopic : Function func CreateTopic() { //idBox = box.Get_id() idBox = box.Get_idBox() CmsTopicIn = "v1/devices/" + idBox + "/telemetry" CmsTopicOut = "v1/devices/" + idBox + "/request/+" file.Println("idBox: " + idBox) file.Println("TOPIC IN :" + CmsTopicIn) file.Println("TOPIC OUT :" + CmsTopicOut) } // PublishData : Function func PublishData() { //var payload string = "{" + "\"ip_private\":" + "\"" + ip + "\"" + "," + "\"box_id\":" + "\"" + id_cam + "\"" + "," + "\"engine_status\":" + "\"" + sts_engine + "\"" + "," + "\"temperature\":" + "\"" + temp + "\"" + "," + "\"ip_public\":" + "\"" + ip_public + "\"" + "," + "\"mac_address\":" + "\"" + ether + "\"" + "," + "\"boot_storage\":" + "\"" + sum_boot_stg + "\"" + "," + "\"boot_storage_usage\":" + "\"" + boot_stg + "\"" + "," + "\"memory\":" + "\"" + sum_mem + "\"" + "," + "\"memory_usage\":" + "\"" + mem + "\"" + "," + "\"cpu_usage\":" + "\"" + FloatToString(avg) + "\"" + "," + "\"storage_usage\":" + "\"" + box_stg + "\"" + "," + "\"storage\":" + "\"" + sum_storage + "\"" + "}" var payload = box.Message_pub() //file.Println(payload) //fmt.Printf("Payload = %v\n\n", payload) Token1 := MqttCmsBi.Publish(CmsTopicIn, 1, false, payload) if Token1.Wait() && Token1.Error() != nil { fmt.Printf("Error Publish message CMS BI: %v\n", Token1.Error()) file.Write_log("Error Publish message CMS BI", box.Path_log_luncher) } else { fmt.Println("Send message CMS BI") //file.Write_log("Send message CMS BI", box.Path_log_luncher) } } // MqttBegin : Khoi tao MQTT func MqttBegin() { OptsCmsBI := mqtt.NewClientOptions() OptsCmsBI.AddBroker(CmsHostBi) OptsCmsBI.SetUsername(CmsAccessTokenBi) OptsCmsBI.SetPassword(CmsPassBi) OptsCmsBI.SetCleanSession(true) OptsCmsBI.SetConnectionLostHandler(MQTTLostConnectHandler) OptsCmsBI.SetOnConnectHandler(MQTTOnConnectHandler) MqttCmsBi = mqtt.NewClient(OptsCmsBI) if Token1 := MqttCmsBi.Connect(); Token1.Wait() && Token1.Error() == nil { file.Println("MQTT CMS Beetsoft Connected\n") file.Write_log("MQTT CMS BeetsoftConnected\n", box.Path_log_luncher) MqttCmsBi.Subscribe(CmsTopicOut, 1, MqttMessageHandler) } else { file.Write_log("MQTT CMS Beetsoft cant not Connected\n", box.Path_log_luncher) MqttStt = false file.Println("MQTT CMS Beetsoft cant not Connected\n") fmt.Printf("Loi CMS Beetsoft : %v \n", Token1.Error()) file.Println("-------------------\n") } } // Check lost connect mqtt server - can't publish msg func MQTTLostConnectHandler(c mqtt.Client, err error) { c.Disconnect(10) MqttStt = false number_repub = number_repub + 1 file.Write_log("MQTT CMS Beetsoft Lost Connect\n", box.Path_log_luncher) file.Write_log("Number reconnect publish msg: "+strconv.Itoa(number_repub)+"\n", box.Path_log_luncher) fmt.Println(err) } // Check lost connect mqtt server - can't subcriber msg from cms func MQTTOnConnectHandler(client mqtt.Client) { number_resub = number_resub + 1 file.Write_log("Lostconnect chanel subcriber: MQTT_OnConnectHandler\n", box.Path_log_luncher) file.Write_log("Number reconnect subcriber msg: "+strconv.Itoa(number_resub)+"\n", box.Path_log_luncher) client.Unsubscribe(CmsTopicOut) time.Sleep(10) client.Subscribe(CmsTopicOut, 1, MqttMessageHandler) file.Write_log("Recall function Subscribe MQTT\n", box.Path_log_luncher) } func MqttServer() { for { if CheckConnectionMQTT() == true { // } else { MqttStt = false } time.Sleep(5 * time.Minute) } } // CheckConnectionMQTT: Check connection to MQTT server Beetsoft func CheckConnectionMQTT() bool { var SttMQTTServer = false // Biến trang thái kết nối MQTT với server out, err := exec.Command("ping", "broker.beetai.com", "-c 5", "-i 0.2", "-w 5").Output() if err != nil { fmt.Println(err) file.Write_log("Recall function Subscribe MQTT\n", box.Path_log_luncher) SttMQTTServer = false } else { } if strings.Contains(string(out), "Destination Host Unreachable") { fmt.Println("Disconnect Server MQTT...") file.Write_log("Disconnect Server MQTT...\n", box.Path_log_luncher) SttMQTTServer = false } else { fmt.Println("Connected Server MQTT...") SttMQTTServer = true } return SttMQTTServer } /************************************************************************/ // MqttMessageHandler : mqtt callback server beetswoft func MqttMessageHandler(MqttBI mqtt.Client, message mqtt.Message) { fmt.Println("=================== MqttMessageHandler ====================") fmt.Printf("Message %s\n", message) fmt.Printf("TOPIC: %s\n", message.Topic()) fmt.Printf("MSG:\n %s\n", message.Payload()) fmt.Println("=========================== + = + ==========================") dec := json.NewDecoder(bytes.NewReader(message.Payload())) var list map[string]interface{} if err := dec.Decode(&list); err != nil { fmt.Printf("Error:%v\n", err) file.Write_log("Message:Loi form message\n", box.Path_log_luncher) } else { //***********************************************// if list["method"] == "change_passwd" { // {"method" : "change_passwd","params": {"password": "123456@"}} file.Println("Changing password \n") s, err := json.Marshal(list["params"]) if err != nil { fmt.Println(err) } var pwd Pwd json.Unmarshal([]byte(string(s)), &pwd) topic := strings.Replace(message.Topic(), "request", "response", 1) pass := pwd.Password fmt.Print(topic, "\n\n") fmt.Printf("New password:%v\n", pass) //box.Set_passwd(pass) msg := `{"method":"change_passwd","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) } else if list["method"] == "open_ssh" { file.Println("Open port ssh\n") topic := strings.Replace(message.Topic(), "request", "response", 1) p, err := json.Marshal(list["params"]) if err != nil { fmt.Println(err) } var pVal = port.Port if StatusSSH == 0 { json.Unmarshal([]byte(string(p)), &port) pVal = port.Port fmt.Println(pVal) box.Ssh_open(pVal) } StatusSSH = 1 msg := `{"method":"open_ssh","status": ` + strconv.Itoa(1) + `,"params": {"port": "` + pVal + `"}}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Open Done ssh\n") } else if list["method"] == "close_ssh" { file.Println("Close port ssh\n") topic := strings.Replace(message.Topic(), "request", "response", 1) if StatusSSH == 1 { box.Ssh_close() StatusSSH = 0 } msg := `{"method":"close_ssh","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Close Done ssh\n") } else if list["method"] == "update_engine" { file.Println("Update engine\n") UpdateEngine() topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"update_engine","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Update engine done\n") } else if list["method"] == "inactive_box" { file.Println("Inactive Box\n") SttPub = false topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"inactive_box","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Inactive Box done\n") } else if list["method"] == "active_box" { file.Println("Active Box\n") SttPub = true topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"active_box","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Active Box done\n") } else if list["method"] == "delete_box" { file.Println("Delete box\n") SttPub = false topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"delete_box","status": ` + strconv.Itoa(1) + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Delete box done\n") } else if list["method"] == "getVersion" { topic := strings.Replace(message.Topic(), "request", "response", 1) //msg := `{"version":` + box.Get_version() + `}` msg := `{"method":"getVersion","params":` + box.Get_version() + `}` file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) file.Println("Get version done\n") } else if list["method"] == "reboot" { file.Write_log("Message:reboot\n", box.Path_log_luncher) //file.Println("=======================>> Reboot BOX <<=====================") box.Reboot_box() } else if list["method"] == "cmd" { topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"cmd","status": ` + strconv.Itoa(1) + `}` //file.Println(topic) file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) //CmsResponse(MqttBI, topic, msg) //file.Printf("%T_%v", list["params"], list["params"]) m, _ := list["params"].(string) var out string k := strings.Index(m, "nano") if k > -1 { if strings.Index(m, "Config.txt") > -1 { path := m[k+5:] file.Println(path) out = file.ReadFile(path) fmt.Println("0") } else { out = box.Cmd_Box(m) fmt.Println("1") } } else { out = box.Cmd_Box(m) fmt.Println("2") } //fmt.Println(out) // out := box.Cmd_Box(m) mg := `{"method":"cmd","status": "` + "\n" + out + "\n" + `"}` fmt.Println(mg) CmsResponse(MqttBI, topic, mg) } else if list["method"] == "upgrade_engine" { file.Println(`list["method"] == "upgrade_engine"`) s, err := json.Marshal(list["params"]) if err != nil { fmt.Println(err) } topic := strings.Replace(message.Topic(), "request", "response", 1) msg := `{"method":"upgrade_engine","status":` + strconv.Itoa(1) + `}` //file.Println(topic) file.Write_log("Message:"+msg+" \n", box.Path_log_luncher) CmsResponse(MqttBI, topic, msg) params := string(s) UpgradeEngine(params) } else { fmt.Println(list) } } } func UpdateEngine() { SttUpdate = true box.Stop_check_engine() box.Stop_engine() box.RequestUpdate(box.Url) SttUpdate = false } // CmsResponse : Phan hoi message tu box to Server func CmsResponse(c mqtt.Client, topic string, msg string) { c.Publish(topic, 0, false, msg) file.Println("Publish message done") } // UpgradeEngine : Nang cap engine func UpgradeEngine(params string) { //file.Println("0") box.Check_path(box.Path_upgrade) //file.Println("1") file.Create_file(box.Path_engine_update) file.Write_file(params, box.Path_engine_update) //file.Println("2") SttUpdate = true box.Stop_check_engine() //file.Println("3") box.Stop_engine() //file.Println("4") }