Morse Micro IoT SDK  2.10.4
mqttdemo.c
Go to the documentation of this file.
1/*
2 * Copyright 2023 Morse Micro
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
95#include <string.h>
96#include "mmosal.h"
97#include "mmwlan.h"
98#include "mmconfig.h"
99#include "mmipal.h"
100#include "mbedtls/build_info.h"
101#include "mbedtls/platform.h"
102#include "mbedtls/net.h"
103#include "mbedtls/ssl.h"
104#include "mbedtls/entropy.h"
105#include "mbedtls/ctr_drbg.h"
106#include "mbedtls/debug.h"
107#include "core_mqtt.h"
108#include "mm_app_common.h"
109
115#define CLIENT_ID_PREFIX "MM_Client_%s"
116
122#define MQTT_BROKER_ENDPOINT "test.mosquitto.org"
123
127#define MQTT_BROKER_PORT 1883
128
130#define KEEP_ALIVE_TIMEOUT_SECONDS 60
132#define MQTT_CONNACK_RECV_TIMEOUT_MS 10000
133
139#define DELAY_BETWEEN_PUBLISHES 1000
140
142#define TOPIC_COUNT 1
143
145#define TOPIC_FORMAT "/MorseMicro/%s/topic"
146
148#define EXAMPLE_MESSAGE "G'day World!"
150#define MAC_ADDR_STR_LEN (18)
151
153static unsigned char buf[1024];
154
160static void MQTTProcessIncomingPublish(MQTTPublishInfo_t *pxPublishInfo)
161{
162 /* Strings are not zero terminated, so we need to explicitly copy and terminate them */
163 static char tmptopic[80];
164 static char tmppayload[128];
165 size_t topic_name_length;
166 size_t payload_length;
167
168 topic_name_length = pxPublishInfo->topicNameLength;
169 if (topic_name_length >= sizeof(tmptopic))
170 {
171 topic_name_length = sizeof(tmptopic) - 1;
172 }
173 strncpy(tmptopic, pxPublishInfo->pTopicName, topic_name_length);
174 tmptopic[topic_name_length] = '\0';
175
176 payload_length = pxPublishInfo->payloadLength;
177 if (payload_length >= sizeof(tmppayload))
178 {
179 payload_length = sizeof(tmppayload) - 1;
180 }
181 strncpy(tmppayload, (char *)pxPublishInfo->pPayload, payload_length);
182 tmppayload[payload_length] = '\0';
183
184 printf("Incoming Topic: %s\n"
185 "Incoming Message : %s\n",
186 tmptopic,
187 tmppayload);
188}
189
195static void MQTTProcessResponse(MQTTPacketInfo_t *pxIncomingPacket, uint16_t usPacketId)
196{
197 MQTTStatus_t xResult = MQTTSuccess;
198 uint8_t *pucPayload = NULL;
199 size_t ulSize = 0;
200
201 (void)usPacketId;
202
203 switch (pxIncomingPacket->type)
204 {
205 case MQTT_PACKET_TYPE_SUBACK:
206 /* A SUBACK from the broker, containing the server response to our
207 * subscription request, has been received. It contains the status
208 * code indicating server approval/rejection for the subscription to
209 * the single topic requested. The SUBACK will be parsed to obtain
210 * the status code, and this status code will be stored in
211 * #xTopicFilterContext. */
212 xResult = MQTT_GetSubAckStatusCodes(pxIncomingPacket, &pucPayload, &ulSize);
213
214 /* MQTT_GetSubAckStatusCodes always returns success if called with
215 * packet info from the event callback and non-NULL parameters. */
216 MMOSAL_ASSERT(xResult == MQTTSuccess);
217 break;
218
219 case MQTT_PACKET_TYPE_UNSUBACK:
220 /* We should check which topic was unsubscribed to by looking at the packetid */
221 printf("Unsubscribed from requested topic\n");
222 break;
223
224 case MQTT_PACKET_TYPE_PINGRESP:
225 /* Nothing to be done from application as library handles
226 * PINGRESP with the use of MQTT_ProcessLoop API function. */
227 printf("WARNING: PINGRESP should not be handled by the application "
228 "callback when using MQTT_ProcessLoop.\n");
229 break;
230
231 /* Any other packet type is invalid. */
232 default:
233 printf("MQTTProcessResponse() called with unknown packet type:(%02X).\n",
234 pxIncomingPacket->type);
235 }
236}
237
244static void EventCallback(MQTTContext_t *pxMQTTContext,
245 MQTTPacketInfo_t *pxPacketInfo,
246 MQTTDeserializedInfo_t *pxDeserializedInfo)
247{
248 /* The MQTT context is not used for this demo. */
249 (void)pxMQTTContext;
250
251 if ((pxPacketInfo->type & 0xF0U) == MQTT_PACKET_TYPE_PUBLISH)
252 {
253 MQTTProcessIncomingPublish(pxDeserializedInfo->pPublishInfo);
254 }
255 else
256 {
257 MQTTProcessResponse(pxPacketInfo, pxDeserializedInfo->packetIdentifier);
258 }
259}
260
268MQTTStatus_t CreateMQTTConnectionToBroker(MQTTContext_t *pxMQTTContext,
269 NetworkContext_t *pxNetworkContext,
270 char *clientID)
271{
272 MQTTStatus_t xResult;
273 MQTTConnectInfo_t xConnectInfo;
274 bool xSessionPresent;
275 TransportInterface_t xTransport;
276 MQTTFixedBuffer_t xBuffer;
277
278 xBuffer.pBuffer = buf;
279 xBuffer.size = sizeof(buf);
280
281 /* Fill in Transport Interface send and receive function pointers. */
282 memset(&xTransport, 0, sizeof(xTransport));
283 xTransport.pNetworkContext = pxNetworkContext;
284 xTransport.send = transport_send;
285 xTransport.recv = transport_recv;
286
287 /* Initialize MQTT library. */
288 xResult = MQTT_Init(pxMQTTContext, &xTransport, mmosal_get_time_ms, EventCallback, &xBuffer);
289 if (xResult != MQTTSuccess)
290 {
291 return xResult;
292 }
293
294 /* Many fields not used in this demo so start with everything at 0. */
295 (void)memset((void *)&xConnectInfo, 0x00, sizeof(xConnectInfo));
296
297 /* Start with a clean session i.e. direct the MQTT broker to discard any
298 * previous session data. Also, establishing a connection with clean
299 * session will ensure that the broker does not store any data when this
300 * client gets disconnected. */
301 xConnectInfo.cleanSession = true;
302
303 /* The client identifier is used to uniquely identify this MQTT client to
304 * the MQTT broker. In a production device the identifier can be something
305 * unique, such as a device serial number. */
306 xConnectInfo.pClientIdentifier = clientID;
307 xConnectInfo.clientIdentifierLength = (uint16_t)strlen(clientID);
308
309 /* Set MQTT keep-alive period. It is the responsibility of the application
310 * to ensure that the interval between Control Packets being sent does not
311 * exceed the Keep Alive value. In the absence of sending any other
312 * Control Packets, the Client MUST send a PINGREQ Packet. */
313 xConnectInfo.keepAliveSeconds = KEEP_ALIVE_TIMEOUT_SECONDS;
314
315 /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it
316 * is passed as NULL. */
317 xResult = MQTT_Connect(pxMQTTContext,
318 &xConnectInfo,
319 NULL,
321 &xSessionPresent);
322 return xResult;
323}
324
331MQTTStatus_t MQTTSubscribe(MQTTContext_t *pxMQTTContext, const char *topic)
332{
333 MQTTStatus_t xResult = MQTTSuccess;
334 MQTTSubscribeInfo_t xMQTTSubscription[TOPIC_COUNT];
335 uint16_t usSubscribePacketIdentifier;
336
337 /* Some fields not used by this demo so start with everything at 0. */
338 (void)memset((void *)&xMQTTSubscription, 0x00, sizeof(xMQTTSubscription));
339
340 /* Each packet requires a unique ID. */
341 usSubscribePacketIdentifier = MQTT_GetPacketId(pxMQTTContext);
342
343 /* Subscribe to the pcExampleTopic topic filter. This example subscribes
344 * to only one topic and uses QoS0. */
345 xMQTTSubscription[0].qos = MQTTQoS0;
346 xMQTTSubscription[0].pTopicFilter = topic;
347 xMQTTSubscription[0].topicFilterLength = strlen(topic);
348
349 /* The client is already connected to the broker. Subscribe to the topic
350 * as specified in pcExampleTopic by sending a subscribe packet then
351 * waiting for a subscribe acknowledgment (SUBACK). */
352 xResult = MQTT_Subscribe(pxMQTTContext,
353 xMQTTSubscription,
354 1, /* Only subscribing to one topic. */
355 usSubscribePacketIdentifier);
356 if (xResult != MQTTSuccess)
357 {
358 return xResult;
359 }
360
361 /* Process incoming packet from the broker. After sending the
362 * subscribe, the client may receive a publish before it receives a
363 * subscribe ack. Therefore, call generic incoming packet processing
364 * function. Since this demo is subscribing to the topic to which no
365 * one is publishing, probability of receiving Publish message before
366 * subscribe ack is zero; but application must be ready to receive any
367 * packet. This demo uses the generic packet processing function
368 * everywhere to highlight this fact. Note there is a separate demo that
369 * shows how to use coreMQTT in a thread safe way – in which case the
370 * MQTT protocol runs in the background and this call is not required. */
371 xResult = MQTT_ProcessLoop(pxMQTTContext);
372 return xResult;
373}
374
381MQTTStatus_t MQTTUnsubscribeFromTopic(MQTTContext_t *pxMQTTContext, const char *topic)
382{
383 MQTTStatus_t xResult;
384 MQTTSubscribeInfo_t xMQTTSubscription[TOPIC_COUNT];
385 uint16_t usUnsubscribePacketIdentifier;
386
387 /* Some fields not used by this demo so start with everything at 0. */
388 (void)memset((void *)&xMQTTSubscription, 0x00, sizeof(xMQTTSubscription));
389
390 /* Subscribe to the pcExampleTopic topic filter. This example subscribes
391 * to only one topic and uses QoS0. */
392 xMQTTSubscription[0].qos = MQTTQoS0;
393 xMQTTSubscription[0].pTopicFilter = topic;
394 xMQTTSubscription[0].topicFilterLength = (uint16_t)strlen(topic);
395
396 /* Each packet requires a unique ID. */
397 usUnsubscribePacketIdentifier = MQTT_GetPacketId(pxMQTTContext);
398
399 /* Send UNSUBSCRIBE packet. */
400 xResult = MQTT_Unsubscribe(pxMQTTContext,
401 xMQTTSubscription,
402 sizeof(xMQTTSubscription) / sizeof(MQTTSubscribeInfo_t),
403 usUnsubscribePacketIdentifier);
404
405 return xResult;
406}
407
416MQTTStatus_t MQTTPublishToTopic(MQTTContext_t *pxMQTTContext,
417 const char *topic,
418 void *payload,
419 size_t payloadLength)
420{
421 MQTTStatus_t xResult;
422 MQTTPublishInfo_t xMQTTPublishInfo;
423
424 /* Some fields are not used by this demo so start with everything at 0. */
425 (void)memset((void *)&xMQTTPublishInfo, 0x00, sizeof(xMQTTPublishInfo));
426
427 /* This demo uses QoS0. */
428 xMQTTPublishInfo.qos = MQTTQoS0;
429 xMQTTPublishInfo.retain = false;
430 xMQTTPublishInfo.pTopicName = topic;
431 xMQTTPublishInfo.topicNameLength = (uint16_t)strlen(topic);
432 xMQTTPublishInfo.pPayload = payload;
433 xMQTTPublishInfo.payloadLength = payloadLength;
434
435 /* Send PUBLISH packet. Packet ID is not used for a QoS0 publish. */
436 xResult = MQTT_Publish(pxMQTTContext, &xMQTTPublishInfo, 0U);
437 return xResult;
438}
439
444void app_init(void)
445{
446 printf("\n\nMorse MQTT Demo (Built " __DATE__ " " __TIME__ ")\n\n");
447
448 /* Initialize and connect to Wi-Fi, blocks till connected */
451
452 uint32_t ulPublishCount = 0U;
453 const uint32_t ulMaxPublishCount = 5UL;
454 NetworkContext_t xNetworkContext = { 0 };
455 MQTTContext_t xMQTTContext;
456 MQTTStatus_t xMQTTStatus;
457 TransportStatus_t xNetworkStatus;
458
459 /* Save space on stack by allocating static, no need to make this global */
460 static char client_id[48];
461 static char topic[80];
462 static char server[80];
463 static char message[80];
464 uint32_t port = MQTT_BROKER_PORT;
465 uint32_t publish_delay = DELAY_BETWEEN_PUBLISHES;
466
467 /* Generate Client ID & topic from MAC */
468 uint8_t mac_addr[MMWLAN_MAC_ADDR_LEN] = { 0 };
469 char mac_address_str[MAC_ADDR_STR_LEN];
470 enum mmwlan_status status = mmwlan_get_mac_addr(mac_addr);
471 if (status != MMWLAN_SUCCESS)
472 {
473 printf("Failed to read MAC address (status code %d)\n", status);
474 return;
475 }
476 snprintf(mac_address_str,
477 sizeof(mac_address_str),
478 "%02x:%02x:%02x:%02x:%02x:%02x",
479 mac_addr[0],
480 mac_addr[1],
481 mac_addr[2],
482 mac_addr[3],
483 mac_addr[4],
484 mac_addr[5]);
485 snprintf(client_id, sizeof(client_id), CLIENT_ID_PREFIX, mac_address_str);
486 snprintf(topic, sizeof(topic), TOPIC_FORMAT, client_id);
487
488 /* Read from config store */
489 (void)mmconfig_read_string("mqtt.clientid", client_id, sizeof(client_id));
490 (void)mmconfig_read_string("mqtt.topic", topic, sizeof(topic));
491 (void)mmconfig_read_uint32("mqtt.port", &port);
492 (void)mmconfig_read_uint32("mqtt.publish_delay", &publish_delay);
493
494 strncpy(server, MQTT_BROKER_ENDPOINT, sizeof(server));
495 (void)mmconfig_read_string("mqtt.server", server, sizeof(server));
496
497 strncpy(message, EXAMPLE_MESSAGE, sizeof(message));
498 (void)mmconfig_read_string("mqtt.message", message, sizeof(message));
499
500 /*************************** Connect. *********************************/
501
502 /* Attempt to connect to the MQTT broker. The socket is returned in
503 * the network context structure. We set NetworkCredentials to NULL to connect in the clear.
504 * Set this parameter if you wish to connect with TLS */
505 printf("Connecting to server socket on %s:%ld...", server, port);
506 xNetworkStatus = transport_connect(&xNetworkContext, server, (uint16_t)port, NULL);
507 if (xNetworkStatus != TRANSPORT_SUCCESS)
508 {
509 printf("failed with code %d\n", xNetworkStatus);
510 return;
511 }
512 printf("ok\n");
513
514 /* Connect to the MQTT broker using the already connected TCP socket. */
515 printf("Client %s Creating MQTT connection with broker....", client_id);
516 xMQTTStatus = CreateMQTTConnectionToBroker(&xMQTTContext, &xNetworkContext, client_id);
517 if (xMQTTStatus != MQTTSuccess)
518 {
519 printf("failed with code %d\n", xMQTTStatus);
520 transport_disconnect(&xNetworkContext);
521 return;
522 }
523 printf("ok\n");
524
525 /**************************** Subscribe. ******************************/
526
527 /* Subscribe to the test topic. */
528 printf("Subscribing to topic %s...", topic);
529 xMQTTStatus = MQTTSubscribe(&xMQTTContext, topic);
530 if (xMQTTStatus != MQTTSuccess)
531 {
532 printf("failed with code %d\n", xMQTTStatus);
533 goto quit;
534 }
535 printf("ok\n");
536
537 /******************* Publish and Keep Alive Loop. *********************/
538
539 /* Publish messages with QoS0, then send and process Keep Alive messages. */
540 for (ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++)
541 {
542 printf("Publishing to topic %s...", topic);
543 xMQTTStatus = MQTTPublishToTopic(&xMQTTContext, topic, message, strlen(message));
544 if (xMQTTStatus != MQTTSuccess)
545 {
546 printf("failed with code %d\n", xMQTTStatus);
547 goto quit;
548 }
549 printf("ok\n");
550
551 /* Process the incoming publish echo. Since the application subscribed
552 * to the same topic, the broker will send the same publish message
553 * back to the application. Note there is a separate demo that
554 * shows how to use coreMQTT in a thread safe way - in which case the
555 * MQTT protocol runs in the background and this call is not
556 * required. */
557 xMQTTStatus = MQTT_ProcessLoop(&xMQTTContext);
558 if (xMQTTStatus != MQTTSuccess)
559 {
560 printf("MQTT_ProcessLoop() failed with code %d\n", xMQTTStatus);
561 goto quit;
562 }
563
564 /* Leave the connection idle for some time. */
565 mmosal_task_sleep(publish_delay);
566 }
567
568 /******************** Unsubscribe from the topic. *********************/
569
570 xMQTTStatus = MQTTUnsubscribeFromTopic(&xMQTTContext, topic);
571 if (xMQTTStatus != MQTTSuccess)
572 {
573 printf("MQTTUnsubscribeFromTopic() failed with code %d\n", xMQTTStatus);
574 goto quit;
575 }
576
577 /* Process the incoming packet from the broker. Note there is a separate
578 * demo that shows how to use coreMQTT in a thread safe way - in which case
579 * the MQTT protocol runs in the background and this call is not required. */
580 xMQTTStatus = MQTT_ProcessLoop(&xMQTTContext);
581 if (xMQTTStatus != MQTTSuccess)
582 {
583 printf("MQTT_ProcessLoop() failed with code %d\n", xMQTTStatus);
584 goto quit;
585 }
586
587 /**************************** Disconnect. *****************************/
588
589quit:
590 /* Disconnect from broker. */
591 printf("Disconnecting from server and closing socket.\n");
592 xMQTTStatus = MQTT_Disconnect(&xMQTTContext);
593 if (xMQTTStatus != MQTTSuccess)
594 {
595 printf("MQTT_Disconnect() failed with code %d\n", xMQTTStatus);
596 }
597
598 /* Close the network connection. */
599 transport_disconnect(&xNetworkContext);
600}
int mmconfig_read_string(const char *key, char *buffer, int bufsize)
Returns the persistent store string value identified by the key.
int mmconfig_read_uint32(const char *key, uint32_t *value)
Returns the unsigned integer stored in persistent store identified by the key.
#define MMOSAL_ASSERT(expr)
Assert that the given expression evaluates to true and abort execution if not.
Definition: mmosal.h:934
void mmosal_task_sleep(uint32_t duration_ms)
Sleep for a period of time, yielding during that time.
uint32_t mmosal_get_time_ms(void)
Get the system time in milliseconds.
static enum mmwlan_status mmwlan_get_mac_addr(uint8_t *mac_addr)
Gets the MAC address of the STA interface.
Definition: mmwlan.h:681
mmwlan_status
Enumeration of status return codes.
Definition: mmwlan.h:51
#define MMWLAN_MAC_ADDR_LEN
Length of a WLAN MAC address.
Definition: mmwlan.h:92
@ MMWLAN_SUCCESS
The operation was successful.
Definition: mmwlan.h:53
Morse Micro application helper routines for initializing/de-initializing the Wireless LAN interface a...
void app_wlan_init(void)
Initializes the WLAN interface (and dependencies) using settings specified in the config store.
void app_wlan_start(void)
Starts the WLAN interface and connects to Wi-Fi using settings specified in the config store.
#define MAC_ADDR_STR_LEN
Length of MAC address string (i.e., "XX:XX:XX:XX:XX:XX") including terminator.
Definition: mqttdemo.c:150
#define EXAMPLE_MESSAGE
Message to publish/subscribe.
Definition: mqttdemo.c:148
MQTTStatus_t MQTTSubscribe(MQTTContext_t *pxMQTTContext, const char *topic)
Subscribes to the specified topic.
Definition: mqttdemo.c:331
MQTTStatus_t MQTTUnsubscribeFromTopic(MQTTContext_t *pxMQTTContext, const char *topic)
Unsubscribes from the specified topic.
Definition: mqttdemo.c:381
#define MQTT_CONNACK_RECV_TIMEOUT_MS
Receive timeout.
Definition: mqttdemo.c:132
#define DELAY_BETWEEN_PUBLISHES
Delay in ms between publishes.
Definition: mqttdemo.c:139
static void MQTTProcessIncomingPublish(MQTTPublishInfo_t *pxPublishInfo)
This callback gets called when a published message matches one of our subscribed topics.
Definition: mqttdemo.c:160
MQTTStatus_t MQTTPublishToTopic(MQTTContext_t *pxMQTTContext, const char *topic, void *payload, size_t payloadLength)
Publish a message to the specified MQTT topic.
Definition: mqttdemo.c:416
static void EventCallback(MQTTContext_t *pxMQTTContext, MQTTPacketInfo_t *pxPacketInfo, MQTTDeserializedInfo_t *pxDeserializedInfo)
This is a callback from MQTT_Process whenever a packet is received from the server.
Definition: mqttdemo.c:244
#define TOPIC_COUNT
Number of topics we subscribe to.
Definition: mqttdemo.c:142
#define KEEP_ALIVE_TIMEOUT_SECONDS
Keep alive Delay.
Definition: mqttdemo.c:130
MQTTStatus_t CreateMQTTConnectionToBroker(MQTTContext_t *pxMQTTContext, NetworkContext_t *pxNetworkContext, char *clientID)
Initializes an MQTT connection with the server.
Definition: mqttdemo.c:268
static unsigned char buf[1024]
Statically allocated buffer for MQTT.
Definition: mqttdemo.c:153
#define MQTT_BROKER_PORT
Broker port.
Definition: mqttdemo.c:127
#define CLIENT_ID_PREFIX
The MQTT client identifier used in this example.
Definition: mqttdemo.c:115
#define TOPIC_FORMAT
Topic to publish/subscribe, we include the client ID to keep it unique.
Definition: mqttdemo.c:145
#define MQTT_BROKER_ENDPOINT
Broker address to connect to.
Definition: mqttdemo.c:122
void app_init(void)
Main entry point to the application.
Definition: mqttdemo.c:444
static void MQTTProcessResponse(MQTTPacketInfo_t *pxIncomingPacket, uint16_t usPacketId)
This callback gets called whenever we receive an ACK from the server.
Definition: mqttdemo.c:195