【STM32+W5500+MQTT+ONENET】27,MQTT协议接入OneNET实际编程操作 2018年12月27日

您所在的位置:网站首页 w5500协议 【STM32+W5500+MQTT+ONENET】27,MQTT协议接入OneNET实际编程操作 2018年12月27日

【STM32+W5500+MQTT+ONENET】27,MQTT协议接入OneNET实际编程操作 2018年12月27日

2023-07-18 11:29| 来源: 网络整理| 查看: 265

0.先汇总,再逐步介绍各个部分的问题。

打开socket(SOCK_TCPC,Sn_MR_TCP,local_port++,Sn_MR_ND);

》》 /*socket连接服务器*/ connect(SOCK_TCPC,onenet_server_ip,onenet_server_port);

》》ONENET device link 设备连接 OneNet_DevLink();

 

接下来对服务器ONENET数据的解析

/*接收来自Server的数据*/   recv(SOCK_TCPC,buff,len);  

》》平台返回数据检测OneNet_RevPro(unsigned char *cmd)  

》》type = MQTT_UnPacketRecv(cmd); //接收数据命令的解析

》》case MQTT_PKT_CMD:   //命令下发                         result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload);         //解出topic和消息体

1,由于连接到ONENET不是一次性就成功的,所以必须要多请求。

     而且socket经常关闭,在socket关闭后必须再次连接上ONENET

if(!onenet_flag)      MQTT_STATE = MQTT_PKT_CONNECT;//非常有必要

2, do_tcp_client()

因为socket经常关闭,在socket关闭后必须再次连接上ONENET,因此设置onenet_flag=0;

case SOCK_CLOSED: /*socket处于关闭状态*/ socket(SOCK_TCPC,Sn_MR_TCP,local_port++,Sn_MR_ND); onenet_flag=0; /* 非常重要,因为socket经常关闭,在socket关闭后必须再次连接上ONENET */ printf("SOCK_CLOSED \r\n"); break; #include #include #include "tcp_demo.h" #include "W5500_conf.h" #include "w5500.h" #include "socket.h" #include "onenet.h" #include "MqttKit.h" #include "utility.h" #include "bsp_led.h" uint8 buff[2048]; /*定义一个2KB的缓存*/ uint8 onenet_server_ip[4] = {183,230,40,39}; //OneNET服务器IP地址 uint16 onenet_server_port = 6002; //OneNET服务器端口号 int MQTT_STATE = MQTT_PKT_CONNECT; //连接 const char *topics[] = {"wisioe"}; uint8 onenet_ping_pak[2] = {0xC0,0x00}; unsigned char *data_ptr = NULL; extern uint8 onenet_flag; /** *@brief TCP Client回环演示函数。 *@param 无 *@return 无 */ void do_tcp_client(void) { uint16 len=0; switch(getSn_SR(SOCK_TCPC)) /*获取socket的状态*/ { case SOCK_CLOSED: /*socket处于关闭状态*/ socket(SOCK_TCPC,Sn_MR_TCP,local_port++,Sn_MR_ND); onenet_flag=0; printf("SOCK_CLOSED \r\n"); break; case SOCK_INIT: /*socket处于初始化状态*/ connect(SOCK_TCPC,onenet_server_ip,onenet_server_port); /*socket连接服务器*/ printf("SOCK_INIT \r\n"); break; case SOCK_ESTABLISHED: /*socket处于连接建立状态*/ // printf("SOCK_ESTABLISHED \r\n"); 只是测试的时候用一下就好 if(getSn_IR(SOCK_TCPC) & Sn_IR_CON) { setSn_IR(SOCK_TCPC, Sn_IR_CON); /*清除接收中断标志位*/ } //只有当ONENET发消息给W5500才进入 len=getSn_RX_RSR(SOCK_TCPC); /*定义len为已接收数据的长度*/ // printf("len=getSn_RX_RSR(SOCK_TCPC) %d\r\n",len); if(len>0) { recv(SOCK_TCPC,buff,len); /*接收来自Server的数据*/ data_ptr = buff; printf("receive 1:%s\r\n ",buff); if(data_ptr != NULL) { OneNet_RevPro(data_ptr); //第一次 delay_ms(100); printf("receive 2:%s\r\n ",data_ptr); } delay_ms(100); printf("receive 3:%s\r\n ",publish_buf); if(publish_buf[0] == 0x31) { delay_ms(100); printf("receive 4:%d\r\n ",publish_buf[0]); LED_ALL_ON; } else if(publish_buf[0] == 0x32) { delay_ms(100); printf("receive 5:%d\r\n ",publish_buf[1]); LED_ALL_OFF; } } //每次300ms的延时都进入此循环查询 switch(MQTT_STATE) { /*MQTT协议连接OneNET代理平台*/ case MQTT_PKT_CONNECT: //在初始化的时候赋值为MQTT_STATE=MQTT_PKT_CONNECT OneNet_DevLink(); //刚刚测试,确实连接上了 // MQTT_STATE = MQTT_PKT_PINGREQ; //存在于枚举 enum MqttPacketType break; /*订阅主题*/ case MQTT_PKT_SUBSCRIBE: OneNet_Subscribe(topics,1); MQTT_STATE = MQTT_PKT_PINGREQ; break; /*Qos2级别发布消息*/ case MQTT_PKT_PUBLISH: OneNet_Publish("wisioe", "MQTT Publish Test"); //发布消息 delay_ms(300); //等待平台响应 /*接收平台发送的PubRec并回复PubRel响应*/ len=getSn_RX_RSR(SOCK_TCPC); recv(SOCK_TCPC,buff,len); data_ptr = buff; if(data_ptr != NULL) OneNet_RevPro(data_ptr); delay_ms(100); //PubRel响应等待平台响应 len=getSn_RX_RSR(SOCK_TCPC); recv(SOCK_TCPC,buff,len); data_ptr = buff; if(data_ptr != NULL) OneNet_RevPro(data_ptr); MQTT_STATE = MQTT_PKT_PINGREQ; /*120秒发送一次Ping响应保持长连接*/ break; case MQTT_PKT_UNSUBSCRIBE: MQTT_UnSubscribe(topics,1); MQTT_STATE = MQTT_PKT_PINGREQ; break; case MQTT_PKT_PINGREQ: if(onenet_ping_time > 120) { send(SOCK_TCPC,onenet_ping_pak,2); onenet_ping_time = 0; } break; } break; case SOCK_CLOSE_WAIT: /*socket处于等待关闭状态*/ printf("SOCK_CLOSE_WAIT \r\n"); close(SOCK_TCPC); break; } }

3,OneNet_DevLink()连接ONENET

case 0:printf( "Tips:    连接成功\r\n"); MQTT_STATE = MQTT_PKT_PINGREQ; onenet_flag=1;  break;

 //在连接上ONENET平台之后,将MQTT_STATE = MQTT_PKT_PINGREQ; onenet_flag=1;,非常关键

extern uint8 onenet_flag; //========================================================== // 函数名称: OneNet_DevLink // 函数功能: 与onenet创建连接 // 入口参数: 无 // 返回参数: 1-成功 0-失败 // 说明: 与onenet平台建立连接 //========================================================== void OneNet_DevLink(void) { int len; MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //mqtt协议包 ,NULL就是以后需要发送的数据数组 unsigned char *dataPtr; printf( "OneNet_DevLink\r\nPRODUCT_ID: %s\r\nAUIF: %s, DEV_ID:%s\r\n",PRODUCT_ID, DEV_APIKEY, DEV_ID); if(MQTT_PacketConnect(PRODUCT_ID, DEV_APIKEY, DEV_ID, 256, 0, MQTT_QOS_LEVEL0, NULL, NULL, 0, &mqttPacket) == 0) { send(SOCK_TCPC,mqttPacket._data,mqttPacket._len); //TCP协议传输,准备与ONENET平台建立连接 delay_ms(100); //等待平台响应 len=getSn_RX_RSR(SOCK_TCPC); recv(SOCK_TCPC,w5500_buf,len); dataPtr = w5500_buf; if(dataPtr != NULL) { if(MQTT_UnPacketRecv(dataPtr) == MQTT_PKT_CONNACK) { switch(MQTT_UnPacketConnectAck(dataPtr)) { case 0:printf( "Tips: 连接成功\r\n"); MQTT_STATE = MQTT_PKT_PINGREQ; onenet_flag=1; break; //在连接上ONENET平台之后,将MQTT_STATE = MQTT_PKT_PINGREQ; onenet_flag=1;,非常关键 case 1:printf("WARN: 连接失败:协议错误\r\n");break; case 2:printf("WARN: 连接失败:非法的clientid\r\n");break; case 3:printf("WARN: 连接失败:服务器失败\r\n");break; case 4:printf("WARN: 连接失败:用户名或密码错误\r\n");break; case 5:printf("WARN: 连接失败:非法链接(比如token非法)\r\n");break; default:printf("ERR: 连接失败:未知错误\r\n");break; } } } MQTT_DeleteBuffer(&mqttPacket); //删包 } else printf( "WARN: MQTT_PacketConnect Failed\r\n"); }

4,MQTT_PacketConnect()打包连接ONENET平台的函数,具体的定义可以看我前面一章26的定义。可以正确连接上ONENET,说明此函数的定义没有问题

//========================================================== // 函数名称: MQTT_PacketConnect // // 函数功能: 连接消息组包 // // 入口参数: user:用户名:产品ID // password:密码:鉴权信息或apikey // devid:设备ID // cTime:连接保持时间 // clean_session:离线消息清除标志 // qos:重发标志 // will_topic:异常离线topic // will_msg:异常离线消息 // will_retain:消息推送标志 // mqttPacket:包指针 // // 返回参数: 0-成功 其他-失败 // // 说明: //========================================================== uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid, uint16 cTime, uint1 clean_session, uint1 qos, const int8 *will_topic, const int8 *will_msg, int32 will_retain, MQTT_PACKET_STRUCTURE *mqttPacket) { uint8 flags = 0; uint8 will_topic_len = 0; uint16 total_len = 15; int16 len = 0, devid_len = strlen(devid); if(!devid) return 1; total_len += devid_len + 2; //断线后,是否清理离线消息:1-清理 0-不清理-------------------------------------------- if(clean_session) { flags |= MQTT_CONNECT_CLEAN_SESSION; } //异常掉线情况下,服务器发布的topic------------------------------------------------------ if(will_topic) { flags |= MQTT_CONNECT_WILL_FLAG; will_topic_len = strlen(will_topic); total_len += 4 + will_topic_len + strlen(will_msg); } //qos级别--主要用于PUBLISH(发布态)消息的,保证消息传递的次数----------------------------- switch((unsigned char)qos) { case MQTT_QOS_LEVEL0: flags |= MQTT_CONNECT_WILL_QOS0; //最多一次 break; case MQTT_QOS_LEVEL1: flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS1); //最少一次 break; case MQTT_QOS_LEVEL2: flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS2); //只有一次 break; default: return 2; } //主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了 if(will_retain) { flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_RETAIN); } //账号为空 密码为空--------------------------------------------------------------------- if(!user || !password) { return 3; } flags |= MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSORD; total_len += strlen(user) + strlen(password) + 4; //分配内存----------------------------------------------------------------------------- MQTT_NewBuffer(mqttPacket, total_len); if(mqttPacket->_data == NULL) return 4; memset(mqttPacket->_data, 0, total_len); /*************************************固定头部***********************************************/ //固定头部----------------------连接请求类型--------------------------------------------- mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_CONNECT _data + mqttPacket->_len); if(len < 0) { MQTT_DeleteBuffer(mqttPacket); return 5; } else mqttPacket->_len += len; /*************************************可变头部***********************************************/ //可变头部----------------------协议名长度 和 协议名-------------------------------------- mqttPacket->_data[mqttPacket->_len++] = 0; mqttPacket->_data[mqttPacket->_len++] = 4; mqttPacket->_data[mqttPacket->_len++] = 'M'; mqttPacket->_data[mqttPacket->_len++] = 'Q'; mqttPacket->_data[mqttPacket->_len++] = 'T'; mqttPacket->_data[mqttPacket->_len++] = 'T'; //可变头部----------------------protocol level 4----------------------------------------- mqttPacket->_data[mqttPacket->_len++] = 4; //可变头部----------------------连接标志(该函数开头处理的数据)----------------------------- mqttPacket->_data[mqttPacket->_len++] = flags; //可变头部----------------------保持连接的时间(秒)---------------------------------------- mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(cTime); mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(cTime); /*************************************消息体************************************************/ //消息体----------------------------devid长度、devid------------------------------------- mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(devid_len); mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(devid_len); strncat((int8 *)mqttPacket->_data + mqttPacket->_len, devid, devid_len); mqttPacket->_len += devid_len; //消息体----------------------------will_flag 和 will_msg--------------------------------- if(flags & MQTT_CONNECT_WILL_FLAG) { unsigned short mLen = 0; if(!will_msg) will_msg = ""; mLen = strlen(will_topic); mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen); mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen); strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_topic, mLen); mqttPacket->_len += mLen; mLen = strlen(will_msg); mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen); mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen); strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_msg, mLen); mqttPacket->_len += mLen; } //消息体----------------------------use--------------------------------------------------- if(flags & MQTT_CONNECT_USER_NAME) { unsigned short user_len = strlen(user); mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(user_len); mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(user_len); strncat((int8 *)mqttPacket->_data + mqttPacket->_len, user, user_len); mqttPacket->_len += user_len; } //消息体----------------------------password---------------------------------------------- if(flags & MQTT_CONNECT_PASSORD) { unsigned short psw_len = strlen(password); mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(psw_len); mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(psw_len); strncat((int8 *)mqttPacket->_data + mqttPacket->_len, password, psw_len); mqttPacket->_len += psw_len; } return 0; }

5,在socket建立后,接收ONENET下发的命令

case SOCK_ESTABLISHED: /*socket处于连接建立状态*/ // printf("SOCK_ESTABLISHED \r\n"); 只是测试的时候用一下就好 if(getSn_IR(SOCK_TCPC) & Sn_IR_CON) { setSn_IR(SOCK_TCPC, Sn_IR_CON); /*清除接收中断标志位*/ } //只有当ONENET发消息给W5500才进入 len=getSn_RX_RSR(SOCK_TCPC); /*定义len为已接收数据的长度*/ // printf("len=getSn_RX_RSR(SOCK_TCPC) %d\r\n",len); if(len>0) { recv(SOCK_TCPC,buff,len); /*接收来自Server的数据*/ data_ptr = buff; printf("receive 1:%s\r\n ",buff); if(data_ptr != NULL) { OneNet_RevPro(data_ptr); //第一次 delay_ms(100); printf("receive 2:%s\r\n ",data_ptr); } delay_ms(100); printf("receive 3:%s\r\n ",publish_buf); if(publish_buf[0] == 0x31) { delay_ms(100); printf("receive 4:%d\r\n ",publish_buf[0]); LED_ALL_ON; } else if(publish_buf[0] == 0x32) { delay_ms(100); printf("receive 5:%d\r\n ",publish_buf[1]); LED_ALL_OFF; } }

6,下面三个函数连着使用,其中数组req_payload[]就是对我们有用的下发命令

void OneNet_RevPro(unsigned char *cmd);

type = MQTT_UnPacketRecv(cmd); //接收数据包的解析

result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload);   //解出topic和消息体

//========================================================== // 函数名称: OneNet_RevPro // // 函数功能: 平台返回数据检测 // // 入口参数: dataPtr:平台返回的数据 // // 返回参数: 无 // // 说明: //========================================================== void OneNet_RevPro(unsigned char *cmd) { MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0}; //协议包 char *req_payload = NULL; char *cmdid_topic = NULL; unsigned char type = 0; unsigned char qos = 0; static unsigned short pkt_id = 0; short result = 0; char *dataPtr = NULL; char numBuf[10]; int num = 0; type = MQTT_UnPacketRecv(cmd); //接收数据包的解析 switch(type) { case MQTT_PKT_CMD: //命令下发 result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload); //解出topic和消息体 if(result == 0)//对命令解析成功 { printf("cmdid:%s\r\n接收的命令为req:%s\r\n", cmdid_topic, req_payload); if(memcmp(req_payload,"publish",7)==0) { printf("接收命令publish解析成功\r\n "); MQTT_STATE =MQTT_PKT_PUBLISH; //MQTT_PacketSaveData(DEV_ID, int16 send_len, int8 *type_bin_head, 5, *mqttPacket); } else if(memcmp(req_payload,"subscribe",9)==0) { printf("接收命令subscribe解析成功\r\n "); MQTT_STATE = MQTT_PKT_SUBSCRIBE; } else printf("接收命令到其他的命令\r\n "); if(MQTT_PacketCmdResp(cmdid_topic, req_payload, &mqttPacket) == 0) //命令回复组包 response { printf( "Tips: Send CmdResp\r\n"); send(SOCK_TCPC,mqttPacket._data, mqttPacket._len); //回复命令 printf( "mqttPacket._data:%s\r\n",mqttPacket._data); MQTT_DeleteBuffer(&mqttPacket); //删包 } } break; case MQTT_PKT_PUBLISH: //接收的Publish消息 result = MQTT_UnPacketPublish(cmd, &cmdid_topic, &req_payload, &qos, &pkt_id); if(result == 0) { printf("topic: %s\r\npayload: %s\r\n", cmdid_topic, req_payload); memcpy(publish_buf,req_payload,strlen(req_payload)); switch(qos) { case 1: //收到publish的qos为1,设备需要回复Ack if(MQTT_PacketPublishAck(pkt_id, &mqttPacket) == 0) { printf( "Tips: Send PublishAck\r\n"); send(SOCK_TCPC,mqttPacket._data, mqttPacket._len); MQTT_DeleteBuffer(&mqttPacket); } break; case 2: //收到publish的qos为2,设备先回复Rec //平台回复Rel,设备再回复Comp if(MQTT_PacketPublishRec(pkt_id, &mqttPacket) == 0) { printf("Tips: Send PublishRec\r\n"); send(SOCK_TCPC,mqttPacket._data, mqttPacket._len); MQTT_DeleteBuffer(&mqttPacket); } break; default: break; } } break; case MQTT_PKT_PUBACK: //发送Publish消息,平台回复的Ack if(MQTT_UnPacketPublishAck(cmd) == 0) printf("Tips: MQTT Publish Send OK\r\n"); break; case MQTT_PKT_PUBREC: //发送Publish消息,平台回复的Rec,设备需回复Rel消息 if(MQTT_UnPacketPublishRec(cmd) == 0) { printf("Tips: Rev PublishRec\r\n"); if(MQTT_PacketPublishRel(MQTT_PUBLISH_ID, &mqttPacket) == 0) { printf ("Tips: Send PublishRel\r\n"); send(SOCK_TCPC,mqttPacket._data, mqttPacket._len); MQTT_DeleteBuffer(&mqttPacket); } } break; case MQTT_PKT_PUBREL: //收到Publish消息,设备回复Rec后,平台回复的Rel,设备需再回复Comp if(MQTT_UnPacketPublishRel(cmd, pkt_id) == 0) { printf("Tips: Rev PublishRel\r\n"); if(MQTT_PacketPublishComp(MQTT_PUBLISH_ID, &mqttPacket) == 0) { printf( "Tips: Send PublishComp\r\n"); send(SOCK_TCPC,mqttPacket._data, mqttPacket._len); MQTT_DeleteBuffer(&mqttPacket); } } break; case MQTT_PKT_PUBCOMP: //发送Publish消息,平台返回Rec,设备回复Rel,平台再返回的Comp if(MQTT_UnPacketPublishComp(cmd) == 0) { printf("Tips: Rev PublishComp\r\n"); } break; case MQTT_PKT_SUBACK: //发送Subscribe消息的Ack if(MQTT_UnPacketSubscribe(cmd) == 0) printf("Tips: MQTT Subscribe OK\r\n"); else printf("Tips: MQTT Subscribe Err\r\n"); break; case MQTT_PKT_UNSUBACK: //发送UnSubscribe消息的Ack if(MQTT_UnPacketUnSubscribe(cmd) == 0) printf( "Tips: MQTT UnSubscribe OK\r\n"); else printf("Tips: MQTT UnSubscribe Err\r\n"); break; default: result = -1; break; } //ESP8266_Clear(); //清空缓存 if(result == -1) return; dataPtr = strchr(req_payload, '}'); //搜索'}' if(dataPtr != NULL && result != -1) //如果找到了 { dataPtr++; while(*dataPtr >= '0' && *dataPtr


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3