STM32 W5500 MQTT Client 发布订阅及断线重连

您所在的位置:网站首页 w5500断线 STM32 W5500 MQTT Client 发布订阅及断线重连

STM32 W5500 MQTT Client 发布订阅及断线重连

2024-07-16 19:33| 来源: 网络整理| 查看: 265

使用STM32 W5500做MQTT Client,使得数据上传broker,并接收broker传来的消息,并支持断网/拔网线再插入网线能够重新连接broker这样的功能,需要具备以下条件:

1、STM32 W5500基础入网配置,使能PC电脑端可以PING通W5500。

2、STM32 W5500的TCP Client收发数据的回环测试没有问题。

3、了解MQTT协议。

关于MQTT的介绍,本文不做重点。需要了解的是MQTT协议是基于TCP协议之上封装的协议。

关于MQTT Client依赖的MQTT支持库函数,下载地址 《MQTT C语言库函数》

这些库函数是干嘛的?

MQTT协议在STM32 W5500中使用的前提,首先通过TCP连接到broker指定的IP和端口。

然后需要发送MQTT连接的指令,这个指令内容是通过 "MQTTConnectClient.c"文件中的

1int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)

这个方法来实现组装的,返回值大于0表示组装后的数组有效内容长度,在通过W5500的send方法,发送给broker。

broker接收到Client端发来的MQTT连接请求后,会返回一组数据,判断是否连接成功,或者各种失败(协议版本错误,用户名密码错误等)。

MQTT Client如果想要接收到broker发来的消息,需要先订阅主题,订阅主题的指令内容是通过"MQTTSubscribeClient.c"文件中的

12int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count,         MQTTString topicFilters[], int requestedQoSs[])

这个方法来实现组装的,同样返回值大于0表示组装后的数组有效内容长度,在通过W5500的send方法,发送给broker。

以上两个举例都是组装指令内容。

那么接收到broker发来消息,如何解析?

"MQTTDeserializePublish.c"这个文件的

12int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,         unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)

这个方法可以实现消息内容的解析。

总之,依赖的MQTT支持库函数几乎可以使我们不用在乎协议的具体内容,就可以实现MQTT Client的功能。

STM32 W5500 MQTT Client端,我通过枚举类型给它定义三种状态

1enum MQTT_STATE {MQTT_INIT, MQTT_CONNOK, MQTT_SUBOK};

MQTT_INIT - 初始状态(MQTT未连接,未订阅,注意是MQTT的,而不是TCP连接了没有)

MQTT_CONNOK - MQTT连接成功(MQTT Client端发起MQTT连接,并接收到了broker返回连接成功)

MQTT_SUBOK - MQTT订阅成功(MQTT Client端想broker订阅消息,并受到了broker返回订阅成功)

这几种状态的关连,在程序开始执行时,MQTT Client端处于MQTT_INIT状态,或者程序执行一段时间后,MQTT PING指令发几次broker没有回复,认为MQTT Client端处于MQTT_INIT状态。

MQTT Client端处于MQTT_CONNOK 状态时可以发布数据到broker,但是无法接收来自broker的消息。

MQTT Client端处于MQTT_SUBOK 状态时可以发布数据到broker,也可以接收来自broker的消息。

如果TCP Client处于CLOSE的状态,那么MQTT Client端将处于MQTT_INIT 状态。

做好MQTT Client端的难点在于维系 TCP socket的状态与MQTT Client的状态的关系。

贴出我实现MQTT Client的c代码:

impl_mqtt.c

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366#ifndef __IMPL_MQTT_H #define __IMPL_MQTT_H #include "impl_mqtt.h" #endif int mqttstate = MQTT_INIT; int cnt_ping_not_response = 0; int cnt_sock_init = 0; u8 buf_pub[1024]; u32 ping_timestamp, now_timestamp; int func_tcp_sock_send(u8 sockno, u8 *buf_mqsend, u16 len_mqsend) {     if(getSn_SR(sockno) == SOCK_ESTABLISHED)     {         return send(sockno, buf_mqsend, len_mqsend);     }     return -1;      } int func_tcp_sock_read(u8 sockno, u8 *buf_mqrecv, u16 len_mqrecv) {     if((getSn_SR(sockno) == SOCK_ESTABLISHED))     {         len_mqrecv = getSn_RX_RSR(sockno);         if(len_mqrecv > 0)         {             return recv(sockno, buf_mqrecv, len_mqrecv);         }     }     return -1; } void func_judge_timeout_ms(u32 *timespan) {     delay_ms(1);     *timespan = *timespan + 1; } u8 func_judge_mqtt_recvmsg_package_type(u8 *buf_mqrecv, u16 len_mqrecv) {     MQTTHeader header = {0};     if(len_mqrecv > 0)     {         header.byte = buf_mqrecv[0];         return header.bits.type;     }     return 0; } void func_mqtt_client_dealwith_recvmsg(u8 sockno, u8 *buf_mqrecv, u16 len_mqbuf, u16 len_mqrecv) {     if(len_mqrecv > 0)     {         ping_timestamp = get_systick_timestamp();         // package type to deal         switch(func_judge_mqtt_recvmsg_package_type(buf_mqrecv, len_mqrecv))         {             case CONNACK:             break;             case PUBLISH://analysis msg             {                 int rc;                 u8 buf_recv[1024];                 u8* payload;                 int len_payload;                 unsigned char retained, dup;                 int qos;                 unsigned short packetid;                 MQTTString topicrecv;                 MQTTString topicpub;                 payload = buf_recv;                 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicrecv, &payload, &len_payload, buf_mqrecv, len_mqrecv);                 if(rc == 1)                 {                     //TODO ...                     //TEST code                     topicpub.cstring = (char*)"mytopic";                     memset(buf_pub, 0, sizeof(buf_pub));                     func_run_mqtt_publish(sockno, buf_pub, sizeof(buf_pub), topicpub, payload, len_payload);                 }             }                       break;             case PUBACK:             break;             case PUBREC://Qos2 msg receipt             case PUBREL://Qos2 msg receipt             case PUBCOMP://Qos2 msg receipt             {                 unsigned char packettype, dup;                 unsigned short packetid;                 if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf_mqrecv, len_mqbuf) == 1)                 {                     memset(buf_mqrecv, 0, len_mqbuf);                     len_mqrecv = MQTTSerialize_ack(buf_mqrecv, len_mqbuf, packettype, dup, packetid);                     if(len_mqrecv > 0)                     {                         func_tcp_sock_send(sockno, buf_mqrecv, len_mqrecv);                     }                 }             }             break;             case SUBACK:             case UNSUBACK:             case PINGREQ:             case PINGRESP:             case DISCONNECT:             break;             default:             break;         }     } } void func_mqtt_client_connect_broker(int *state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt) {     u32 timespan;     int len_cont;     int res;     len_cont = MQTTSerialize_connect(buf_mqsend, len_mqsend, conn_mqtt);     if(len_cont > 0)     {         res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);         if(res > 0)         {             timespan = 0;             memset(buf_mqsend, 0, len_mqsend);//reuse buffer             for(;;)             {                 if((len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend)) > 3)                 {                     if(func_judge_mqtt_recvmsg_package_type(buf_mqsend, len_cont) == CONNACK)                     {                         *state = MQTT_CONNOK;                         ping_timestamp = get_systick_timestamp();                     }                     break;                 }                 func_judge_timeout_ms(×pan);                 if(timespan > 500)                 {                     break;                 }             }         }     } } void func_mqtt_client_ping_broker(int *state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend) {     u32 timespan;     int len_cont;     int res;     if(get_systick_timestamp() - ping_timestamp > 5*1000)     {               len_cont = MQTTSerialize_pingreq(buf_mqsend, len_mqsend);         if(len_cont > 0)         {             res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);             if(res > 0)             {                 timespan = 0;                 memset(buf_mqsend, 0, len_mqsend);//reuse buffer                 for(;;)                 {                     len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend);                     // pingrsp or others' published msg                     if(len_cont > 0)                     {                         //recv pingrsp                         ping_timestamp = get_systick_timestamp();                         cnt_ping_not_response = 0;                                                 // other type msg to deal with                         func_mqtt_client_dealwith_recvmsg(sockno, buf_mqsend, len_mqsend, len_cont);                         break;                     }                                         func_judge_timeout_ms(×pan);                     if(timespan > 10)                     {                         cnt_ping_not_response ++;                         if(cnt_ping_not_response > 1)                         {                             *state = MQTT_INIT;                             close(sockno);                             cnt_ping_not_response = 0;                         }                         break;                     }                 }             }             else             {                 cnt_ping_not_response ++;                 if(cnt_ping_not_response > 2)                 {                     *state = MQTT_INIT;                     close(sockno);                     cnt_ping_not_response = 0;                 }             }         }     } } int func_run_mqtt_publish(u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTString topicName, u8* payload, int payloadlen) {     int len;     int rc;         if(mqttstate >= MQTT_CONNOK)     {         len = MQTTSerialize_publish(buf_mqsend, len_mqsend, 0, 0, 0, 0, topicName, payload, payloadlen);         if(len > 0)         {             memcpy(buf_pub, buf_mqsend, len);             rc = func_tcp_sock_send(sockno, buf_pub, len);             if(rc > 0)             {                 ping_timestamp = get_systick_timestamp();             }             return rc;         }     }       return 0; } int func_mqtt_client_subtopic_from_broker(u8 sockno, u8 *buf_mqsend, u16 len_mqsend, int count,\         MQTTString topicFilters[], int requestedQoSs[]) {     u32 timespan;     int len_cont;     int res;     if(mqttstate != MQTT_SUBOK)     {         len_cont = MQTTSerialize_subscribe(buf_mqsend, len_mqsend, 0, SUBSCRIBE, count, topicFilters, requestedQoSs);         if(len_cont > 0)         {             res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);             if(res > 0)             {                 timespan = 0;                 memset(buf_mqsend, 0, len_mqsend);//reuse buffer                 for(;;)                 {                     if((len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend)) > 0 && func_judge_mqtt_recvmsg_package_type(buf_mqsend, len_cont) == SUBACK)//nowtime, ignore other type msg                     {                         mqttstate = MQTT_SUBOK;                         ping_timestamp = get_systick_timestamp();                         return 0;                     }                     func_judge_timeout_ms(×pan);                     if(timespan > 500)                     {                         return -2;                     }                 }             }         }     }     return -1; } void func_mqtt_client_recvmsg_from_broker(u8 sockno, u8 *buf_mqsend, u16 len_mqsend) {     int len_cont;     memset(buf_mqsend, 0, len_mqsend);     len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend);     if(len_cont > 0)     {         func_mqtt_client_dealwith_recvmsg(sockno, buf_mqsend, len_mqsend, len_cont);     }   } u8 func_run_mqtt_tcpsock(u8 sockno, u8 *broker_ip, u16 broker_port, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt) {     static u16 any_port = 50000;     u8 res;         switch(getSn_SR(sockno))     {         case SOCK_CLOSED:         {             close(sockno);             socket(sockno, Sn_MR_TCP, any_port++, 0x00);             cnt_sock_init++;             if(cnt_sock_init > 30)             {                 cnt_sock_init = 0;                 close(sockno);                 mqttstate = MQTT_INIT;             }             if(any_port > 64000)             {                 any_port =50000;             }         }                   break;         case SOCK_INIT:         {             res = connect(sockno, broker_ip, broker_port);             if(res)             {                 //mqtt connect request                 mqttstate = func_run_mqtt_progress(mqttstate, sockno, buf_mqsend, len_mqsend, conn_mqtt);             }             else             {                 if(cnt_ping_not_response > 0)                 {                     mqttstate = MQTT_INIT;                 }             }         }         break;         case SOCK_ESTABLISHED:         {             //run mqtt progress             mqttstate = func_run_mqtt_progress(mqttstate, sockno, buf_mqsend, len_mqsend, conn_mqtt);         }         break;         case SOCK_CLOSE_WAIT:         {             mqttstate = MQTT_INIT;             close(sockno);         }         break;         default:             break;     }     return mqttstate; } u8 func_run_mqtt_progress(int state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt) {     switch(state)     {         case MQTT_INIT:         {             func_mqtt_client_connect_broker(&state, sockno, buf_mqsend, len_mqsend, conn_mqtt);         }         break;         case MQTT_CONNOK:         {             func_mqtt_client_ping_broker(&state, sockno, buf_mqsend, len_mqsend);             if(state > MQTT_INIT)             { //              func_mqtt_client_subtopic_from_broker(&state, sockno, buf_mqsend, len_mqsend);             }         }         break;         case MQTT_SUBOK:         {             func_mqtt_client_ping_broker(&state, sockno, buf_mqsend, len_mqsend);             if(state != MQTT_INIT)             {                 func_mqtt_client_recvmsg_from_broker(sockno, buf_mqsend, len_mqsend);             }         }         break;         default:         break;     }     return state; }

可能不是十分完美,但是一般工程上使用应该问题不大,我也测试了好久。

测试的主函数,是做了MQTT的回环测试,MQTT Client端连接到broker后,发起订阅主题,并一次性订阅多个主题,分别是字符串subtopic、subtopic2、subtopic3、subtopic4。当其他客户端连接到broker后,向这四个主题发布消息,STM32 W5500 MQTT Client端接收后,会向 mytopic发布一条消息。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144#ifndef __STM32F10X_H #define __STM32F10X_H #include "stm32f10x.h" #endif #ifndef __Z_UTIL_TIME_H #define __Z_UTIL_TIME_H #include "z_util_time.h" #endif #ifndef __Z_HARDWARE_LED_H #define __Z_HARDWARE_LED_H #include "z_hardware_led.h" #endif #ifndef __Z_HARDWARE_SPI_H #define __Z_HARDWARE_SPI_H #include "z_hardware_spi.h" #endif #ifndef __W5500_H #define __W5500_H #include "w5500.h" #endif #ifndef __SOCKET_H #define __SOCKET_H #include "socket.h" #endif #ifndef __W5500_CONF_H #define __W5500_CONF_H #include "w5500_conf.h" #endif #ifndef __DHCP_H #define __DHCP_H #include "dhcp.h" #endif #ifndef __Z_HARDWARE_USART2_H #define __Z_HARDWARE_USART2_H #include "z_hardware_usart2.h" #endif #include "MQTTPacket.h" #ifndef __IMPL_MQTT_H #define __IMPL_MQTT_H #include "impl_mqtt.h" #endif int main(void) {     u32 dhcp_timestamp;     u8 ip_broker[] = {192, 168, 1, 127};     u16 port_broker = 1883;     u8 buf_mqtt_send[1024];     u8 mac[6]={0, };     DHCP_Get dhcp_get;     int mqtt_stat;         MQTTString sub_topic = MQTTString_initializer;     MQTTString sub_topic2 = MQTTString_initializer;     MQTTString sub_topic3 = MQTTString_initializer;     MQTTString sub_topic4 = MQTTString_initializer;     MQTTString sub_topics[4];     int nums_sub_topic_qoss[4] = {0, };     char stpc_str[64] = {'t', 'c'};         MQTTPacket_connectData conn_mqtt = MQTTPacket_connectData_initializer;     conn_mqtt.willFlag = 0;     conn_mqtt.MQTTVersion = 3;     conn_mqtt.clientID.cstring = (char*)"dev_abcdef";     conn_mqtt.username.cstring = (char*)"abcdef";     conn_mqtt.password.cstring = (char*)"123456";     conn_mqtt.keepAliveInterval = 60;     conn_mqtt.cleansession = 1;         systick_configuration();     init_led();         init_system_spi();     func_w5500_reset();         init_hardware_usart2_dma(9600);         getMacByLockCode(mac);     setSHAR(mac);         sysinit(txsize, rxsize);     setRTR(2000);   setRCR(3);         //DHCP     for(;func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) != 0;);        if(func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) == 0)     {         setSUBR(dhcp_get.sub);         setGAR(dhcp_get.gw);         setSIPR(dhcp_get.lip);         close(1);     }     dhcp_timestamp = get_systick_timestamp();         memcpy(stpc_str, (char*)"subtopic", strlen("subtopic"));     sub_topic.cstring = stpc_str;     sub_topics[0] = sub_topic;     sub_topic2.cstring = "subtopic2";     sub_topics[1] = sub_topic2;     sub_topic3.cstring = "subtopic3";     sub_topics[2] = sub_topic3;     sub_topic4.cstring = "subtopic4";     sub_topics[3] = sub_topic4;         for(;;)     {         if(get_systick_timestamp() - dhcp_timestamp > 59*1000)// 1 min dhcp         {             dhcp_timestamp = get_systick_timestamp();             if(func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) == 0)             {                 setSUBR(dhcp_get.sub);                 setGAR(dhcp_get.gw);                 setSIPR(dhcp_get.lip);                 close(1);             }         }         mqtt_stat = func_run_mqtt_tcpsock(2, ip_broker, port_broker, buf_mqtt_send, sizeof(buf_mqtt_send), &conn_mqtt);         if(mqtt_stat >= MQTT_CONNOK)         {             if(mqtt_stat == MQTT_CONNOK)             {                 memset(buf_mqtt_send, 0, sizeof(buf_mqtt_send));                 func_mqtt_client_subtopic_from_broker(2, buf_mqtt_send, sizeof(buf_mqtt_send), 4,   sub_topics, nums_sub_topic_qoss);             }             func_led1_toggle();         }               delay_ms(500);                     } }

电脑端使用MQTT.fx工具进行测试,测试效果

目前测试还比较稳定,支持热插拔网线,以及路由器断网后再次联网,MQTT Client仍可继续连接broker。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3