kawaiimqtt:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 /* 5. wifi connect */
2 rt_wlan_connect(WIFI_SSID, WIFI_KEY);
3
4 /* 6. startup mqtt client */
5 mqtt_log_init();
6 rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
7 /* check network connection status */
8
9 net_dev = netdev_get_by_name("w0");
10 while(!(netdev_is_internet_up(net_dev)))
11 {
12 rt_thread_mdelay(100);
13 timeout++;
14 if(timeout == 200)
15 {
16 rt_kprintf("wifi connect failed!\r\n");
17 return -RT_ERROR;
18 }
19 }
20
21 client = mqtt_lease();
22
23 mqtt_set_host(client, MQTT_URL);
24 mqtt_set_port(client, MQTT_PORT);
25 mqtt_set_user_name(client, "rt-thread");
26 mqtt_set_password(client, "rt-thread");
27 mqtt_set_client_id(client, cid);
28 mqtt_set_clean_session(client, 1);
29
30 if(mqtt_connect(client))
31 {
32 KAWAII_MQTT_LOG_E("%s:%d %s()... mqtt connect failed...", __FILE__, __LINE__, __FUNCTION__);
33 is_started = 0;
34 return -RT_ERROR;
35 }
36
37 is_started = 1;
38 mqtt_subscribe(client, SUB1_NAME, QOS0, sub_topic_handle_led);
39 mqtt_subscribe(client, SUB2_NAME, QOS1, sub_topic_handle_num);
40 mqtt_subscribe(client, SUB_OTA, QOS2, sub_topic_handle_ota);
41
42 tid3 = rt_thread_create("mq_pub", mqtt_t_publish, RT_NULL, 2048, 13, 10);
43 if (tid3 != RT_NULL)
44 {
45 rt_thread_startup(tid3);
46 }
View Code
1保证网络连接
2设置mqtt客户端:远端URL,端口,客户端ID,用户名,密码。。。
3连接远程mqtt服务器
4订阅主题:主题,Qos,收到主题时的响应(将主题名和主题内容分离出来,然后匹配响应)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 typedef struct {
2 char **str; //the PChar of string array
3 size_t num; //the number of string
4 }IString;
5
6 /* 拆分字符串 */
7 static int Split(char *src, char *delim, IString* istr)//split buf
8 {
9 int i;
10 char *str = NULL, *p = NULL;
11
12 (*istr).num = 1;
13 str = (char*)rt_calloc(strlen(src)+1,sizeof(char));
14 if (str == NULL) return 0;
15 (*istr).str = (char**)rt_calloc(1,sizeof(char *));
16 if ((*istr).str == NULL) return 0;
17 strcpy(str,src);
18
19 p = strtok(str, delim);
20 (*istr).str[0] = (char*)rt_calloc(strlen(p)+1,sizeof(char));
21 if ((*istr).str[0] == NULL) return 0;
22 strcpy((*istr).str[0],p);
23 for(i = 1; (p = strtok(NULL, delim)); i++)
24 {
25 (*istr).num++;
26 (*istr).str = (char**)rt_realloc((*istr).str,(i+1)*sizeof(char *));
27 if ((*istr).str == NULL) return 0;
28 (*istr).str[i] = (char*)rt_calloc(strlen(p)+1,sizeof(char));
29 if ((*istr).str[0] == NULL) return 0;
30 strcpy((*istr).str[i],p);
31 }
32 rt_free(str);
33 str = p = NULL;
34
35 return 1;
36 }
37 static void sub_topic_handle_led(void* client, message_data_t* msg)
38 {
39 (void) client;
40 KAWAII_MQTT_LOG_I("-----------------------------------------------------------------------------------\r\n");
41 KAWAII_MQTT_LOG_I("%s:%d %s()...\ntopic: %s\nmessage:%s\r\n", __FILE__, __LINE__, __FUNCTION__, msg->topic_name, (char*)msg->message->payload);
42 KAWAII_MQTT_LOG_I("-----------------------------------------------------------------------------------\r\n");
43
44 int i;
45 IString istr;
46 struct file_msg file_msg;
47 char *tick_num;
48
49 if (Split(msg->message->payload," ",&istr))
50 {
51 for (i = 0; i < istr.num; i++)
52 rt_kprintf("%s\n",istr.str[i]);
53
54 if(i == 2)
55 {
56 if(rt_strncmp(istr.str[0], "led", 3))
57 {
58 KAWAII_MQTT_LOG_E("command error!\r\n");
59 }
60 else
61 {
62 if(!(rt_strncmp(istr.str[1], "off", 3)))
63 {
64 rt_pin_write(LED_PIN_RED, PIN_HIGH);
65 }
66 else if(!(rt_strncmp(istr.str[1], "on", 2)))
67 {
68 rt_pin_write(LED_PIN_RED, PIN_LOW);
69 }
70 else
71 {
72 KAWAII_MQTT_LOG_E("state command error!\r\n");
73 for (i = 0; i < istr.num; i++)
74 rt_free(istr.str[i]);
75 rt_free(istr.str);
76 return;
77 }
78
79 tick_num = rt_calloc(1, 10);
80 if (tick_num == RT_NULL) {
81 rt_kprintf("memory is not enough \r\n");
82 }
83 else {
84 itoa(rt_tick_get(), tick_num, 10);
85 file_msg.timestamp = tick_num;
86 file_msg.cmd = istr.str[0];
87 file_msg.state = istr.str[1];
88 file_msg.str = istr.str;
89
90 rt_mq_send(&mq_demo, &file_msg, sizeof(file_msg));
91 }
92 }
93 }
94 else
95 {
96 KAWAII_MQTT_LOG_E("command error! \r\n");
97 for (i = 0; i < istr.num; i++)
98 rt_free(istr.str[i]);
99 rt_free(istr.str);
100 }
101
102 }
103 else
104 {
105 KAWAII_MQTT_LOG_E("Split failure!\r\n");
106 }
107 }
View Code
5 建立发布线程用于发布主题(设置主题Oos,负载内容,然后向指定主题发送(若是数据采集可以用消息队列等待发送)。)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 /* 板子发布消息的线程 */
2 static void mqtt_t_publish(void *parameter)
3 {
4 char pub_buf[64];
5 mqtt_message_t msg;
6 memset(&msg, 0, sizeof(msg));
7
8 rt_thread_mdelay(2 * RT_TICK_PER_SECOND);
9
10 while(1)
11 {
12 if(is_started)
13 {
14 memset(pub_buf, 0, 64);
15
16 sprintf(pub_buf, "%d", usMRegHoldBuf[SLAVE_ADDR - 1][MB_RECV_REG_NUM]);
17
18 msg.qos = QOS0;
19 msg.payload = pub_buf;
20
21 mqtt_publish(client, PUB_NAME, &msg);
22 }
23 rt_thread_mdelay(5 * RT_TICK_PER_SECOND);
24 }
25 }
View Code
|