MQTT-umqtt

MQTT is a machine-to-machine ( M2M)/Internet of Things ( IoT) connection protocol. Its full English name is " Message Queuing Telemetry Transport ", and its Chinese translation is " Message Queuing Telemetry Transport " protocol. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks. It is a "lightweight" communication protocol based on the 发布/ 订阅( publish/ subscribe) mode. The protocol is built on TCP/IPthe protocol and was released by IBM in 1999. At present, the latest version of the protocol is V5.0, and the commonly used version is V3.1.1.

Figure 1-1 Communication example based on MQTT

Definition of noun:

  • Publisher

  • Broker

  • Subscriber

  • Topic - the topic for publish/subscribe

Process Overview: In the figure above, the role of each type of sensor is publisher. For example, the humidity sensor and temperature sensor publish two topics named "Moisture" and "Temp" to the connected MQTT Broker (periodically); of course, along with the two topics, there are also humidity values ​​and temperature values, which are called "messages". The role of several clients is subscriber. For example, if the mobile phone APP subscribes to the "Temp" topic from the Broker, the temperature value published by the temperature sensor in the Broker can be obtained on the mobile phone.

Additional notes:

  1. The roles of publishers and subscribers are not fixed but relative. A publisher can also subscribe to topics from a broker at the same time, and a subscriber can publish topics to a broker. That is, a publisher can be a subscriber and a subscriber can be a publisher.

  2. Broker can be an online cloud server or a local LAN client. According to the requirements, Broker itself will also include some functions of subscribing/publishing topics.

For more reference materials, please visit MQTT Chinese website or MQTT official website .


Learning objectives:

  1. Understand the MQTT protocol and its message format

  2. Understanding the relationship between uMQTT and LWIP

  3. Understand the implementation principle of uMQTT

Any general/private protocol is composed of various message data packets that are pre-defined and constrained by certain rules, and MQTT is no exception. In the MQTT protocol, all data packets are composed of a maximum of three parts: 固定header+ 可变header+ 有效载荷, as shown in Table 2-1:

Table 2-1 MQTT data packet format

Among them, the fixed header is required, and the variable header and payload are optional. Therefore, in theory, the minimum length of the MQTT protocol data packet is 2 bytes, which minimizes the extra resource consumption it occupies. Next, we will take MQTT3.1.1 as an example to introduce the detailed format of each part of the message (for detailed usage, see [**4. Implementation of uMQTT**](#4. Implementation of uMQTT)).

Table 2 - 2 Fixed header format

The fixed header consists of at least two bytes, as shown in Table 2-2. The high 4 bits of the first byte describe the type of the current data packet ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)), see Table 2-3 below; the low 4 bits define the flag bits related to the packet type ([Flags specific to each MQTT Control Packet type](#2.1.2 Flags specific to each MQTT Control Packet type)), see Table 2-4 below; the second and subsequent up to 4 bytes represent the byte length of the remaining data ([Remaining Length](#2.1.3 Remaining Length)), see Table 2-5 below.

Table 2-3 Control Packet type

Table 2-4 Control Packet Flags

Bits[3..0 ]** of Bytes[0] ** in the Fixed header contain the special identifiers for each MQTT control message type, as shown in Tables 2 - 4 above. Although the flags in the table are "Reserved", they are reserved for future versions, but they must be assigned the values ​​in the table during data transmission. If an illegal flag that does not match the table is identified, the receiver must close the network connection.

Among them, three flags have been used in version 3.1.1:

  • DUP = Duplicate flag of control message [Duplicate]

  • QoS = Quality of Service for PUBLISH messages

  • RETAIN = PUBLISH message retention flag

Table 2-5 Remaining Length

The remaining length indicates the number of bytes in the remaining part of the current message, including the variable header and payload data. The remaining length does not include the number of bytes used to encode the remaining length field itself, that is, it does not include the length of the Fixed header.

The remaining length field uses a variable length encoding scheme, as shown in Table 2 - 5 above:

  • Values ​​less than 128 are encoded using a single byte.

  • Larger values ​​are handled as follows - the least significant 7 bits are used to encode the data, and the most significant bit is used to indicate whether there are more bytes. Therefore, each byte can encode 128 values ​​and a continuation bit . The continuation bit only indicates the carry function and is not included in the calculation of the length.

  • This field can be up to 4 bytes.

Some MQTT control packets contain a variable header part, which is located between the fixed header and the payload. The content of the variable header varies depending on the control packet type ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)); for example, for a CONNECT type packet, its variable header contains four fields: "Protocol Name", "Protocol Level", "Connect Flags" and "Keep Alive". For other variable headers, see [MQTT Message Format](#3. MQTT Message Format).

Although there are various variable header contents, the Packet Identifier is an important universal field and has the following 唯一性:

Table 2-6 Packet Identifier

This general field exists in many types of messages, see Table 2-7:

Table 2 - 7 Control Packets that contain a Packet Identifier

The rules for Packet Identifier are as follows:

  • SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in the case of QoS > 0) control packets MUST contain a non-zero 16-bit Message Identifier.

  • Each time a client sends a new packet of one of these types, it MUST assign it a Message Identifier that is not currently in use.

  • If a client resends a particular control packet, it MUST use the same Message Identifier in subsequent retransmissions of that packet. The Message Identifier can be reused after the client processes the ACK packet corresponding to that particular control packet.

    • The PUBLISH packet of QoS 1 corresponds to PUBACK.

    • The PUBLISH packet of QoS 2 corresponds to PUBCOMP.

    • SUBSCRIBE and UNSUBSCRIBE are corresponding to each other by SUBACK and UNSUBACK.

  • When the server sends a PUBLISH message with QoS>0, the above rules should also be followed.

  • A PUBLISH message with QoS=0 is not allowed to contain a message identifier.

  • The message identifier of the PUBACK, PUBREC, and PUBREL packets must be the same as the value originally sent in the PUBLISH packet.

  • Similarly, the SUBACK and UNSUBACK packets must have the same message identifier as the SUBSCRIBE and UNSUBSCRIBE packets.

The client and server assign message identifiers independently of each other. Therefore, a client-server combination using the same message identifier can achieve concurrent information exchange.

Some MQTT control messages contain a payload at the end of the packet. For details, see [MQTT message format](#3. MQTT message format). For example, for a PUBLISH message, the payload can be an application message. Table 2 - 8 lists the various control messages that require a payload.

Table 2-8 Control Packets that contain a Payload

Since there are many formats for Variable header and Payload, this article will not go into details. For details, please refer to the following links:

uMQTT software package is independently developed by RT-Thread, based on the client implementation of MQTT 3.1.1 protocol, which provides basic functions for devices to communicate with MQTT Broker. Click here for GitHub homepage and here for Gitee homepage .

The uMQTT package features are as follows:

  • Implement basic connection, subscription, and publishing functions;

  • It has multiple heartbeat keep-alive and device reconnection mechanisms to ensure the online status of MQTT and adapt to complex situations;

  • Supports three types of information quality: QoS=0, QoS=1, QoS=2;

  • Support multi-client use;

  • The user-side interface is simple and has multiple external callback functions.

  • Supports multiple configurable technical parameters, easy to use and convenient for product development;

  • It has powerful functions, low resource usage, and supports tailorable functions.

The uMQTT software package is mainly used to implement the MQTT protocol on embedded devices. The main work of the software package is based on the MQTT protocol. The software package structure diagram is shown in Figure 4-1:

Figure 4-1 uMQTT structure diagram

The main tasks in the software package implementation process are:

  1. According to the MQTT 3.1.1 protocol, perform package data protocol encapsulation and unpacking;

  2. The transport layer function is adapted to connect to the SAL (Socket Abstraction Layer) layer;

  3. The uMQTT client layer writes an interface that conforms to the application layer based on the protocol packet layer and the transport layer. It implements basic connection, disconnection, subscription, unsubscription, message publishing and other functions. It supports QoS0/1/2 for sending information. It uses the uplink timer to implement multiple heartbeat keep-alive mechanisms and device reconnection mechanisms, increase the online stability of the device, and adapt to complex situations.

As shown in Figure 1-1, to connect to the Broker, the embedded device needs to be used as a client in the MQTT protocol.

In the uMQTT component umqtt.hfile, the MQTT configuration information used to initialize the client is abstracted to form the corresponding data structure:

struct umqtt_info
{
    rt_size_t send_size, recv_size;                     /* 收发缓冲区大小 */ 
    const char *uri;                                    /* 完整的URI (包含: URI + URN) */ 
    const char *client_id;                              /* 客户端ID */ 
    const char *lwt_topic;                              /* 遗嘱主题 */
    const char *lwt_message;                            /* 遗嘱消息 */ 
    const char *user_name;                              /* 用户名 */ 
    const char *password;                               /* 密码 */ 
    enum umqtt_qos lwt_qos;                             /* 遗嘱QoS */
    umqtt_subscribe_cb lwt_cb;                          /* 遗嘱回调函数 */
    rt_uint8_t reconnect_max_num;                       /* 最大重连次数 */ 
    rt_uint32_t reconnect_interval;                     /* 最大重连时间间隔 */ 
    rt_uint8_t keepalive_max_num;                       /* 最大保活次数 */ 
    rt_uint32_t keepalive_interval;                     /* 最大保活时间间隔 */ 
    rt_uint32_t recv_time_ms;                           /* 接收超时时间 */ 
    rt_uint32_t connect_time;                           /* 连接超时时间 */ 
    rt_uint32_t send_timeout;                           /* 上行(发布/订阅/取消订阅)超时时间 */ 
    rt_uint32_t thread_stack_size;                      /* 线程栈大小 */ 
    rt_uint8_t thread_priority;                         /* 线程优先级 */ 
#ifdef PKG_UMQTT_TEST_SHORT_KEEPALIVE_TIME
    rt_uint16_t connect_keepalive_sec;                  /* 连接信息,保活秒数 */    
#endif
};copymistakeCopy Success

These configuration information generally need to be filled in and specified before creating a uMQTT client, such as key information such as Broker's "URI", "user name" or "password". Other non-critical information, if not specified, will be umqtt_createcalled in the client creation function umqtt_check_def_infoto assign default values:

static void umqtt_check_def_info(struct umqtt_info *info)
{
    if (info) 
    {
        if (info->send_size == 0) { info->send_size = PKG_UMQTT_INFO_DEF_SENDSIZE; }
        if (info->recv_size == 0) { info->recv_size = PKG_UMQTT_INFO_DEF_RECVSIZE; }
        if (info->reconnect_max_num == 0) { info->reconnect_max_num = PKG_UMQTT_INFO_DEF_RECONNECT_MAX_NUM; }
        if (info->reconnect_interval == 0) { info->reconnect_interval = PKG_UMQTT_INFO_DEF_RECONNECT_INTERVAL; }
        if (info->keepalive_max_num == 0) { info->keepalive_max_num = PKG_UMQTT_INFO_DEF_KEEPALIVE_MAX_NUM; }
        if (info->keepalive_interval == 0) { info->keepalive_interval = PKG_UMQTT_INFO_DEF_HEARTBEAT_INTERVAL; }
        if (info->connect_time == 0) { info->connect_time = PKG_UMQTT_INFO_DEF_CONNECT_TIMEOUT; }
        if (info->recv_time_ms == 0) { info->recv_time_ms = PKG_UMQTT_INFO_DEF_RECV_TIMEOUT_MS; }
        if (info->send_timeout == 0) { info->send_timeout = PKG_UMQTT_INFO_DEF_SEND_TIMEOUT; }
        if (info->thread_stack_size == 0) { info->thread_stack_size = PKG_UMQTT_INFO_DEF_THREAD_STACK_SIZE; }
        if (info->thread_priority == 0) { info->thread_priority = PKG_UMQTT_INFO_DEF_THREAD_PRIORITY; }
    }
}copymistakeCopy Success

However, with only the above information, it is not possible to run an MQTT client. Therefore umqtt.c, in the structure containing umqtt_infoall umqtt_clientthe data used to initialize the client:

struct umqtt_client
{
    int sock;                                                   /* 套接字 */ 
    enum umqtt_client_state connect_state;                      /* mqtt客户端状态 */ 

    struct umqtt_info mqtt_info;                                /* mqtt用户配置信息 */ 
    rt_uint8_t reconnect_count;                                 /* mqtt客户端重连计数 */ 
    rt_uint8_t keepalive_count;                                 /* mqtt保活计数 */ 
    rt_uint32_t pingreq_last_tick;                              /* mqtt的PING请求上一次滴答值 */
    rt_uint32_t uplink_next_tick;                               /* 上行连接的下一次滴答值 */ 
    rt_uint32_t uplink_last_tick;                               /* 上行连接的上一次滴答值 */ 
    rt_uint32_t reconnect_next_tick;                            /* 客户端断开重连时的下一次滴答值 */ 
    rt_uint32_t reconnect_last_tick;                            /* 客户端断开重连时的上一次滴答值 */ 

    rt_uint8_t *send_buf, *recv_buf;                            /* 收发缓冲区指针 */ 
    rt_size_t send_len, recv_len;                               /* 收发数据的长度 */ 

    rt_uint16_t packet_id;                                      /* mqtt报文标识符 */ 

    rt_mutex_t lock_client;                                     /* mqtt客户端互斥锁 */ 
    rt_mq_t msg_queue;                                          /* mqtt客户端消息队列 */ 

    rt_timer_t uplink_timer;                                    /* mqtt保活重连定时器 */ 

    int sub_recv_list_len;                                      /* 接收订阅信息的链表长度 */ 
    rt_list_t sub_recv_list;                                    /* 订阅消息的链表头 */ 

    rt_list_t qos2_msg_list;                                    /* QoS2的消息链表 */
    struct umqtt_pubrec_msg pubrec_msg[PKG_UMQTT_QOS2_QUE_MAX]; /* 发布收到消息数组(QoS=2) */                   

    umqtt_user_callback user_handler;                           /* 用户句柄 */ 

    void *user_data;                                            /* 用户数据 */ 
    rt_thread_t task_handle;                                    /* umqtt任务线程 */ 

    rt_list_t list;                                             /* umqtt链表头 */ 
};copymistakeCopy Success

The structure and enumeration type definitions of some of the above members can umqtt.hbe viewed in the file. The structure will be initialized after the client function is created umqtt_createand called umqtt_check_def_info:

  1. Initialize the will data structure (if any)

  2. Apply memory for the send and receive buffers

  3. Create a mutex, message queue, and timeout reconnection timer (timeout callback to achieve reconnection + keep alive)

  4. Initialize each linked list

  5. Create umqtt_thread- mqtt data sending and receiving thread

  6. Returns mqtt_clientthe structure address

When the value returned in step 6 is not empty, you can call the function to send a CONNECT message to connect to the Broker umqtt_startthrough LWIP ; after the connection is successful , the thread will be started and the MQTT communication will be started.umqtt_thread

In umqtt_startthe function, the state of the uMQTT client is first set to UMQTT_CS_LINKING, which means 正在连接中. Next, the function is called umqtt_connectto connect the local client to the Broker.

Connecting to a Broker is a two-step process:

  1. Create a socket and establish a link connection with the Broker

  2. Send a CONNECT message to create an MQTT protocol connection

In umqtt_connectthe function, umqtt_trans_connectthe first step is completed by calling the function:

/** 
 * TCP/TLS Connection Complete for configured transport
 *
 * @param uri the input server URI address
 * @param sock the output socket
 *
 * @return <0: failed or other error
 *         =0: success
 */
int umqtt_trans_connect(const char *uri, int *sock)
{
    int _ret = 0;
    struct addrinfo *addr_res = RT_NULL;

    *sock = -1;
    /* 域名解析 */
    _ret = umqtt_resolve_uri(uri, &addr_res);
    if ((_ret < 0) || (addr_res == RT_NULL)) 
    {
        LOG_E("resolve uri err");
        _ret = UMQTT_FAILED;
        goto exit;
    }
    /* 创建套接字 */
    if ((*sock = socket(addr_res->ai_family, SOCK_STREAM, UMQTT_SOCKET_PROTOCOL)) < 0) 
    {
        LOG_E("create socket error!");
        _ret = UMQTT_FAILED;
        goto exit;
    }
    /* 设置套接字工作在非阻塞模式下 */
    _ret = ioctlsocket(*sock, FIONBIO, 0);
    if (_ret < 0) 
    {
        LOG_E(" iocontrol socket error!");
        _ret = UMQTT_FAILED;
        goto exit;
    }
    /* 建立连接 */
    if ((_ret = connect(*sock, addr_res->ai_addr, addr_res->ai_addrlen)) < 0) 
    {
        LOG_E(" connect err!");
        closesocket(*sock);
        *sock = -1;
        _ret = UMQTT_FAILED;
        goto exit;
    }

exit:
    if (addr_res) {
        freeaddrinfo(addr_res);
        addr_res = RT_NULL;
    }
    return _ret;
}copymistakeCopy Success

This function is the core function for uMQTT to establish a connection with Broker through LWIP. And from the [uMQTT framework diagram](#**4.1. uMQTT structure framework**), we can know that this function uses SAL , the socket abstraction layer component, to call the relevant interface to access LWIP. Some of the functions encapsulated by the SAL component ( used to resolve the domain name getaddrinfoin umqtt_resolve_urithe function) are as follows:

int getaddrinfo(const char *nodename,
       const char *servname,
       const struct addrinfo *hints,
       struct addrinfo **res)
{
    return sal_getaddrinfo(nodename, servname, hints, res);
}
---------------------------------------------------------------------------------------------
#define connect(s, name, namelen)                          sal_connect(s, name, namelen)
#define recvfrom(s, mem, len, flags, from, fromlen)        sal_recvfrom(s, mem, len, flags, from, fromlen)
#define send(s, dataptr, size, flags)                      sal_sendto(s, dataptr, size, flags, NULL, NULL)
#define socket(domain, type, protocol)                     sal_socket(domain, type, protocol)
#define closesocket(s)                                     sal_closesocket(s)
#define ioctlsocket(s, cmd, arg)                           sal_ioctlsocket(s, cmd, arg)copymistakeCopy Success

When the uMQTT client successfully establishes a link layer connection with the Broker, it will immediately send a CONNECT message to establish an MQTT protocol layer connection.

The uMQTT component uses a clever structure + union to manage all sent and received messages:

union umqtt_pkgs_msg                                /* mqtt message packet type */ 
{
    struct umqtt_pkgs_connect     connect;          /* connect */ 
    struct umqtt_pkgs_connack     connack;          /* connack */ 
    struct umqtt_pkgs_publish     publish;          /* publish */ 
    struct umqtt_pkgs_puback      puback;           /* puback */ 
    struct umqtt_pkgs_pubrec      pubrec;           /* publish receive (QoS 2, step_1st) */ 
    struct umqtt_pkgs_pubrel      pubrel;           /* publish release (QoS 2, step_2nd) */ 
    struct umqtt_pkgs_pubcomp     pubcomp;          /* publish complete (QoS 2, step_3rd) */ 
    struct umqtt_pkgs_subscribe   subscribe;        /* subscribe topic */ 
    struct umqtt_pkgs_suback      suback;           /* subscribe ack */ 
    struct umqtt_pkgs_unsubscribe unsubscribe;      /* unsubscribe topic */ 
    struct umqtt_pkgs_unsuback    unsuback;         /* unsubscribe ack */ 
};

struct umqtt_msg
{
    union umqtt_pkgs_fix_header header;             /* fix header */ 
    rt_uint32_t msg_len;                            /* message length */ 
    union umqtt_pkgs_msg msg;                       /* retain payload message */ 
};copymistakeCopy Success

The various message types of this structure correspond exactly to the various control message types in [Chapter 2.1.1](#2.1.1 MQTT Control Packet type) (PINGREQ and PINGRESP messages only need two bytes each, refer to here , so there is no need to use structures to manage them). umqtt_encodeDifferent packet assembly functions are called through functions, the structures of the corresponding formats are filled, and then sent to the Broker server:

/** 
 * packaging the data according to the format
 *
 * @param type the input packaging type
 * @param send_buf the output send buf, result of the package
 * @param send_len the output send buffer length
 * @param message the input message
 *
 * @return <=0: failed or other error
 *         >0: package data length
 */
int umqtt_encode(enum umqtt_type type, rt_uint8_t *send_buf, size_t send_len, struct umqtt_msg *message)
{
    int _ret = 0;
    switch (type)
    {
    case UMQTT_TYPE_CONNECT:
        _ret = umqtt_connect_encode(send_buf, send_len, &(message->msg.connect));
        break;
    case UMQTT_TYPE_PUBLISH:
        _ret = umqtt_publish_encode(send_buf, send_len, message->header.bits.dup, message->header.bits.qos, &(message->msg.publish));
        break;
    case UMQTT_TYPE_PUBACK:
        _ret = umqtt_puback_encode(send_buf, send_len, message->msg.puback.packet_id);
        break;
    case UMQTT_TYPE_PUBREC:
        // _ret = umqtt_pubrec_encode();
        break;
    case UMQTT_TYPE_PUBREL:
        _ret = umqtt_pubrel_encode(send_buf, send_len, message->header.bits.dup, message->msg.pubrel.packet_id);
        break;
    case UMQTT_TYPE_PUBCOMP:
        _ret = umqtt_pubcomp_encode(send_buf, send_len, message->msg.pubcomp.packet_id);
        break;
    case UMQTT_TYPE_SUBSCRIBE:
        _ret = umqtt_subscribe_encode(send_buf, send_len, &(message->msg.subscribe));
        break;
    case UMQTT_TYPE_UNSUBSCRIBE:
        _ret = umqtt_unsubscribe_encode(send_buf, send_len, &(message->msg.unsubscribe));
        break;
    case UMQTT_TYPE_PINGREQ:
        _ret = umqtt_pingreq_encode(send_buf, send_len);
        break;
    case UMQTT_TYPE_DISCONNECT:
        _ret = umqtt_disconnect_encode(send_buf, send_len);
        break;
    default:
        break;
    }
    return _ret;
}
copymistakeCopy Success

The macro definition of the MQTT control packet type corresponds to [2.1.1 MQTT Control Packet type](#2.1.1 MQTT Control Packet type):

enum umqtt_type
{
    UMQTT_TYPE_RESERVED      = 0,
    UMQTT_TYPE_CONNECT       = 1,
    UMQTT_TYPE_CONNACK       = 2,
    UMQTT_TYPE_PUBLISH       = 3,
    UMQTT_TYPE_PUBACK        = 4,
    UMQTT_TYPE_PUBREC        = 5,
    UMQTT_TYPE_PUBREL        = 6,
    UMQTT_TYPE_PUBCOMP       = 7,
    UMQTT_TYPE_SUBSCRIBE     = 8,
    UMQTT_TYPE_SUBACK        = 9,
    UMQTT_TYPE_UNSUBSCRIBE   = 10,
    UMQTT_TYPE_UNSUBACK      = 11,
    UMQTT_TYPE_PINGREQ       = 12,
    UMQTT_TYPE_PINGRESP      = 13,
    UMQTT_TYPE_DISCONNECT    = 14,
};copymistakeCopy Success

Since there are many types of messages, let's take the CONNECT message (variable header - "protocol name", "protocol level", "connection flag", "keep alive interval (seconds)", payload - "client identifier", "will topic", "will message", "user name", "password") as an example to briefly describe the package assembly process of uMQTT:

  1. Fill in the default configuration information of the MQTT client

        encode_msg.msg.connect.protocol_name_len = PKG_UMQTT_PROTOCOL_NAME_LEN;
        encode_msg.msg.connect.protocol_name = PKG_UMQTT_PROTOCOL_NAME;
        encode_msg.msg.connect.protocol_level = PKG_UMQTT_PROTOCOL_LEVEL; /* MQTT3.1.1 ver_lvl:4;  MQTT3.1 ver_lvl:3 */ 
        encode_msg.msg.connect.connect_flags.connect_sign = UMQTT_DEF_CONNECT_FLAGS;
    #ifdef PKG_UMQTT_TEST_SHORT_KEEPALIVE_TIME
        encode_msg.msg.connect.keepalive_interval_sec = ((client->mqtt_info.connect_keepalive_sec == 0) ? PKG_UMQTT_CONNECT_KEEPALIVE_DEF_TIME : client->mqtt_info.connect_keepalive_sec);
    #else
        encode_msg.msg.connect.keepalive_interval_sec = PKG_UMQTT_CONNECT_KEEPALIVE_DEF_TIME;
    #endif
        encode_msg.msg.connect.client_id = client->mqtt_info.client_id;
        encode_msg.msg.connect.will_topic = client->mqtt_info.lwt_topic;
        encode_msg.msg.connect.will_message = client->mqtt_info.lwt_message;
        encode_msg.msg.connect.user_name = client->mqtt_info.user_name;
        if (client->mqtt_info.user_name) 
        {
            encode_msg.msg.connect.connect_flags.bits.username_flag = 1;
        }
        encode_msg.msg.connect.password = client->mqtt_info.password;
        if (client->mqtt_info.password) {
            encode_msg.msg.connect.connect_flags.bits.password_flag = 1;
            encode_msg.msg.connect.password_len = rt_strlen(client->mqtt_info.password);
        }copymistakeCopy Success
  2. Call umqtt_encodeumqtt_connect_encodeencoding function (only encapsulated MQTTSerialize_connect) package:

    static int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)
    {
        unsigned char *ptr = buf;
        MQTTHeader header = { 0 };
        int len = 0;
        int rc = -1;
    
        if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen) 
        {
            rc = UMQTT_BUFFER_TOO_SHORT;
            goto exit;
        }
    
        header.byte = 0;
        header.bits.type = UMQTT_TYPE_CONNECT;
        umqtt_writeChar(&ptr, header.byte);                                 /* 写固定header的第一个字节 */
    
        ptr += umqtt_pkgs_encode(ptr, len);                                 /* 写剩余长度 */
    
        if (options->protocol_level == 4)                                     /* MQTT V3.1.1 */
        {
            umqtt_writeCString(&ptr, "MQTT");
            umqtt_writeChar(&ptr, (char) 4);
        } 
        else 
        {
            umqtt_writeCString(&ptr, "MQIsdp");                                /* MQTT V3.1 */
            umqtt_writeChar(&ptr, (char) 3);
        }
    
        umqtt_writeChar(&ptr, options->connect_flags.connect_sign);
        umqtt_writeInt(&ptr, options->keepalive_interval_sec);
        // umqtt_writeInt(&ptr, PKG_UMQTT_CONNECT_KEEPALIVE_DEF_TIME);                                       /* ping interval max, 0xffff */ 
        umqtt_writeMQTTString(&ptr, options->client_id);
        if (options->connect_flags.bits.will_flag) 
        {
            umqtt_writeMQTTString(&ptr, options->will_topic);
            umqtt_writeMQTTString(&ptr, options->will_message);
        }
    
        if (options->connect_flags.bits.username_flag)
            umqtt_writeMQTTString(&ptr, options->user_name);
        if (options->connect_flags.bits.password_flag)
            umqtt_writeMQTTString(&ptr, options->password);
    
        rc = ptr - buf;
    
    exit:
        return rc;
    }copymistakeCopy Success

    The function is first called MQTTSerialize_connectLengthto calculate the length of the variable header and payload . The obtained len will be passed to umqtt_pkgs_lenthe function as a parameter. Its function is to calculate the number of bytes of the field in the fixed header and add the length of the first byte of the fixed header , which is 1, and compare it with buflen to determine the validity of the packet data.剩余长度

    Why is this kind of compound statement used here if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen) ?

    Because the len length we want to get is the value in the fixed header剩余长度 to facilitate the subsequent packet assembly process, and the effective message length buflen = len + 1 + 剩余长度the number of bytes in the field; if the message length is calculated directly, when the value is written later 剩余长度, it is also necessary to subtract its own byte length and the first byte length of the fixed header, which is 1, which is more complicated and cumbersome.

    Here are some important structures and unions to correspond to the MQTT protocol:

    • Fixed header

      Please refer to [**2.1. Fixed header**](#**2.1. Fixed header**)

      union umqtt_pkgs_fix_header
      {
          rt_uint8_t byte;                                /* header */ 
          struct {        
              rt_uint8_t retain: 1;                       /* reserved bits */ 
              rt_uint8_t qos:    2;                       /* QoS, 0-Almost once; 1-Alteast once; 2-Exactly once */ 
              rt_uint8_t dup:    1;                       /* dup flag */ 
              rt_uint8_t type:   4;                       /* MQTT packet type */ 
          } bits;
      };copymistakeCopy Success
    • CONNECT flag

      union umqtt_pkgs_connect_sign
      {
          rt_uint8_t connect_sign;
          struct {
              rt_uint8_t reserved:       1;               /* reserved bits */ 
              rt_uint8_t clean_session:  1;               /* clean session bit */ 
              rt_uint8_t will_flag:      1;               /* will flag bit */ 
              rt_uint8_t will_Qos:       2;               /* will Qos bit */ 
              rt_uint8_t will_retain:    1;               /* will retain bit */ 
              rt_uint8_t password_flag:  1;               /* password flag bit */ 
              rt_uint8_t username_flag:  1;               /* user name flag bit */ 
          } bits;
      };
      -------------------------------------------------------------------------------
      #define UMQTT_SET_CONNECT_FLAGS(user_name_flag, password_flag, will_retain, will_qos, will_flag, clean_session, reserved)    \
          (((user_name_flag & 0x01) << 7) |    \
          ((password_flag & 0x01) << 6) |  \
          ((will_retain & 0x01) << 5) |   \
          ((will_qos & 0x01) << 3) |   \
          ((will_flag & 0x01) << 2) | \
          ((clean_session & 0x01) << 1) | \
          (reserved & 0x01))
      #define UMQTT_DEF_CONNECT_FLAGS                             (UMQTT_SET_CONNECT_FLAGS(0,0,0,0,0,1,0))copymistakeCopy Success

After the above package assembly process is completed, umqtt_trans_sendthe function will be called to send the send buffer data to the Broker connected to the socket through LWIP:

/** 
 * TCP/TLS send datas on configured transport.
 *
 * @param sock the input socket
 * @param send_buf the input, transport datas buffer
 * @param buf_len the input, transport datas buffer length
 * @param timeout the input, tcp/tls transport timeout
 *
 * @return <0: failed or other error
 *         =0: success
 */
int umqtt_trans_send(int sock, const rt_uint8_t *send_buf, rt_uint32_t buf_len, int timeout)
{
    int _ret = 0;
    rt_uint32_t offset = 0U;
    while (offset < buf_len) 
    {
        _ret = send(sock, send_buf + offset, buf_len - offset, 0);
        if (_ret < 0) 
            return -errno;
        offset += _ret;
    }

    return _ret;
}copymistakeCopy Success

When uMQTT completes sending the CONNECT message, it calls umqtt_handle_readpacketthe function (after completing the CONNECT process, the function will also umqtt_threadbe called cyclically in the thread to send and receive data) to read the Broker's reply and unpack the received data:

static int umqtt_handle_readpacket(struct umqtt_client *client)
{
    int _ret = 0, _onedata = 0, _cnt = 0, _loop_cnt = 0, _remain_len = 0;
    int _temp_ret = 0;
    int _pkt_len = 0;
    int _multiplier = 1;
    int _pkt_type = 0;
    struct umqtt_msg decode_msg = { 0 };
    struct umqtt_msg_ack msg_ack = { 0 };
    struct umqtt_msg encode_msg = { 0 };
    RT_ASSERT(client);

    /* 1. 读Fixed header的第一个字节 */
    _temp_ret = umqtt_trans_recv(client->sock, client->recv_buf, 1);
    if (_temp_ret <= 0) 
    {
        _ret = UMQTT_FIN_ACK;
        LOG_W(" server fin ack! connect failed! need to reconnect!");
        goto exit;
    } 

    /* 2. 读Fixed header的Remaining length字段并解析剩余长度 */
    do {
        if (++_cnt > MAX_NO_OF_REMAINING_LENGTH_BYTES) 
        {
            _ret = UMQTT_FAILED;
            LOG_E(" umqtt packet length error!");
            goto exit;
        }
        _ret = umqtt_readpacket(client, (unsigned char *)&_onedata, 1, client->mqtt_info.recv_time_ms);
        if (_ret == UMQTT_FIN_ACK)
        {
            LOG_W(" server fin ack! connect failed! need to reconnect!");
            goto exit;
        }
        else if (_ret != UMQTT_OK) 
        {
            _ret = UMQTT_READ_FAILED;
            goto exit;
        }
        *(client->recv_buf + _cnt) = _onedata;
        _pkt_len += (_onedata & 0x7F) * _multiplier;
        _multiplier *= 0x80; 
    } while ((_onedata & 0x80) != 0);

    /* 异常处理:如果当前报文的数据长度大于缓冲区长度,会将socket缓冲中的数据全部读出来丢掉,返回UMQTT_BUFFER_TOO_SHORT错误 */
    if ((_pkt_len + 1 + _cnt) > client->mqtt_info.recv_size)
    {
        LOG_W(" socket read buffer too short! will read and delete socket buff! ");
        _loop_cnt = _pkt_len / client->mqtt_info.recv_size;

        do 
        {
            if (_loop_cnt == 0)
            {
                umqtt_readpacket(client, client->recv_buf, _pkt_len, client->mqtt_info.recv_time_ms);
                _ret = UMQTT_BUFFER_TOO_SHORT;
                LOG_W(" finish read and delete socket buff!");
                goto exit;
            }
            else 
            {
                _loop_cnt--;
                umqtt_readpacket(client, client->recv_buf, client->mqtt_info.recv_size, client->mqtt_info.recv_time_ms);
                _pkt_len -= client->mqtt_info.recv_size;
            }
        }while(1);
    }

    /* 3. 读剩余数据——可变header+有效载荷 */
    _ret = umqtt_readpacket(client, client->recv_buf + _cnt + 1, _pkt_len, client->mqtt_info.recv_time_ms);
    if (_ret == UMQTT_FIN_ACK)
    {
        LOG_W(" server fin ack! connect failed! need to reconnect!");
        goto exit;
    }
    else if (_ret != UMQTT_OK)
    {
        _ret = UMQTT_READ_FAILED;
        LOG_E(" read remain datas error!");
        goto exit;
    }

    /* 4. 解析数据包,并根据不同报文类型做相应处理 */
    rt_memset(&decode_msg, 0, sizeof(decode_msg));
    _ret = umqtt_decode(client->recv_buf, _pkt_len + _cnt + 1, &decode_msg);
    if (_ret < 0) 
    {
        _ret = UMQTT_DECODE_ERROR;
        LOG_E(" decode error!");
        goto exit;
    }
    _pkt_type = decode_msg.header.bits.type;
    switch (_pkt_type)
    {
    case UMQTT_TYPE_CONNACK:
        {
            LOG_D(" read connack cmd information!");
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
            set_connect_status(client, UMQTT_CS_LINKED);
        }
        break;
    case UMQTT_TYPE_PUBLISH:
        {
            LOG_D(" read publish cmd information!");
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);

            if (decode_msg.header.bits.qos != UMQTT_QOS2) 
            {
                LOG_D(" qos: %d, deliver message! topic nme: %s ", decode_msg.header.bits.qos, decode_msg.msg.publish.topic_name);
                umqtt_deliver_message(client, decode_msg.msg.publish.topic_name, decode_msg.msg.publish.topic_name_len, 
                                    &(decode_msg.msg.publish));
            }

            if (decode_msg.header.bits.qos != UMQTT_QOS0)
            {   
                rt_memset(&encode_msg, 0, sizeof(encode_msg));
                encode_msg.header.bits.qos = decode_msg.header.bits.qos;
                encode_msg.header.bits.dup = decode_msg.header.bits.dup;
                if (decode_msg.header.bits.qos == UMQTT_QOS1)
                {
                    encode_msg.header.bits.type = UMQTT_TYPE_PUBACK;
                    encode_msg.msg.puback.packet_id = decode_msg.msg.publish.packet_id;
                }
                else if (decode_msg.header.bits.qos == UMQTT_QOS2)
                {
                    encode_msg.header.bits.type = UMQTT_TYPE_PUBREC;
                    add_one_qos2_msg(client, &(decode_msg.msg.publish));
                    encode_msg.msg.pubrel.packet_id = decode_msg.msg.publish.packet_id;
                    add_one_pubrec_msg(client, encode_msg.msg.pubrel.packet_id);        /* add pubrec message */
                }

                _ret = umqtt_encode(encode_msg.header.bits.type, client->send_buf, client->mqtt_info.send_size, 
                                    &encode_msg);
                if (_ret < 0)
                {
                    _ret = UMQTT_ENCODE_ERROR;
                    LOG_E(" puback / pubrec failed!");
                    goto exit;
                }
                client->send_len = _ret;

                _ret = umqtt_trans_send(client->sock, client->send_buf, client->send_len, 
                                        client->mqtt_info.send_timeout);
                if (_ret < 0) 
                {
                    _ret = UMQTT_SEND_FAILED;
                    LOG_E(" trans send failed!");
                    goto exit;                    
                }

            }
        }
        break;
    case UMQTT_TYPE_PUBACK:
        {
            LOG_D(" read puback cmd information!");
            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_PUBACK;
            msg_ack.packet_id = decode_msg.msg.puback.packet_id;
            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                LOG_E(" mq send failed!");
                goto exit;
            }
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
        }
        break;
    case UMQTT_TYPE_PUBREC:
        {
            LOG_D(" read pubrec cmd information!");
            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_PUBREC;
            msg_ack.packet_id = decode_msg.msg.puback.packet_id;
            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK)
            {
                _ret = UMQTT_SEND_FAILED;
                LOG_E(" mq send failed!");
                goto exit;
            }
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
        }
        break;
    case UMQTT_TYPE_PUBREL:
        {
            LOG_D(" read pubrel cmd information!");   

            rt_memset(&encode_msg, 0, sizeof(encode_msg));
            encode_msg.header.bits.type = UMQTT_TYPE_PUBCOMP;
            encode_msg.header.bits.qos = decode_msg.header.bits.qos;
            encode_msg.header.bits.dup = decode_msg.header.bits.dup;            
            encode_msg.msg.pubrel.packet_id = decode_msg.msg.pubrec.packet_id;

            /* publish callback, and delete callback */
            qos2_publish_delete(client, encode_msg.msg.pubrel.packet_id);

            /* delete array numbers! */
            clear_one_pubrec_msg(client, encode_msg.msg.pubrel.packet_id);

            _ret = umqtt_encode(UMQTT_TYPE_PUBCOMP, client->send_buf, client->mqtt_info.send_size, &encode_msg);
            if (_ret < 0)
            {
                _ret = UMQTT_ENCODE_ERROR;
                LOG_E(" pubcomp failed!");
                goto exit;
            }
            client->send_len = _ret;

            _ret = umqtt_trans_send(client->sock, client->send_buf, client->send_len, 
                                    client->mqtt_info.send_timeout);
            if (_ret < 0) 
            {
                _ret = UMQTT_SEND_FAILED;
                LOG_E(" trans send failed!");
                goto exit;                    
            }

        }
        break;
    case UMQTT_TYPE_PUBCOMP:
        {
            LOG_D(" read pubcomp cmd information!");

            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_PUBCOMP;
            msg_ack.packet_id = decode_msg.msg.pubcomp.packet_id;
            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                goto exit;
            }            
        }
        break;
    case UMQTT_TYPE_SUBACK:
        {
            LOG_D(" read suback cmd information!");

            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_SUBACK;
            msg_ack.packet_id = decode_msg.msg.suback.packet_id;

            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);

            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                goto exit;
            }
        }
        break;
    case UMQTT_TYPE_UNSUBACK:
        {
            LOG_D(" read unsuback cmd information!");

            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_UNSUBACK;
            msg_ack.packet_id = decode_msg.msg.unsuback.packet_id;

            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);

            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                goto exit;
            }

        }
        break;
    case UMQTT_TYPE_PINGRESP:
        {
            LOG_I(" ping resp! broker -> client! now tick: %d ", rt_tick_get());
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
        }
        break;
    default:
        {
            LOG_W(" not right type(0x%02x)!", _pkt_type);
        }
        break;
    }

exit:
    return _ret;
}copymistakeCopy Success

Briefly describe the above key steps:

  1. Read the first byte of the Fixed header

    Here the function is called umqtt_trans_recvto read the socket data:

    /** 
     * TCP/TLS receive datas on configured transport.
     *
     * @param sock the input socket
     * @param recv_buf the output, receive datas buffer
     * @param buf_len the input, receive datas buffer length
     *
     * @return <=0: failed or other error
     *         >0: receive datas length
     */
    int umqtt_trans_recv(int sock, rt_uint8_t *recv_buf, rt_uint32_t buf_len)
    {
        return recv(sock, recv_buf, buf_len, 0);
        // return read(sock, recv_buf, buf_len);
    }
    -------------------------------------------------------------------------
    #define recv(s, mem, len, flags)                           sal_recvfrom(s, mem, len, flags, NULL, NULL)copymistakeCopy Success

    sal_recvfromIt can be seen that this function is actually a wrapper for the SAL layer function, which is used to read data of length buf_len from the corresponding sock to recv_buf.

  2. Read the Remaining length field of the Fixed header and parse the remaining length

    I will not go into details here. The algorithm for this part refers to the rules in [2.1.3 Remaining Length](#2.1.3 Remaining Length).

  3. Read remaining data - variable header + payload

    No more details.

  4. Parse data packets and process them accordingly according to different message types

    Here is a key structure and unpacking function:

    • struct umqtt_msg decode_msg = { 0 };copymistakeCopy Success

      For structure members, please refer to **[4.4. uMQTT sends a package](#4.4. uMQTT sends a package)**

    • /** 
       * parse the data according to the format
       *
       * @param recv_buf the input, the raw buffer data, of the correct length determined by the remaining length field
       * @param recv_buf_len the input, the length in bytes of the data in the supplied buffer
       * @param message the output datas
       *
       * @return <0: failed or other error
       *         =0: success
       */
      int umqtt_decode(rt_uint8_t *recv_buf, size_t recv_buf_len, struct umqtt_msg *message)
      {
          int _ret = 0;
          rt_uint8_t* curdata = recv_buf;
          enum umqtt_type type;
          if (message == RT_NULL) 
          {
              _ret = UMQTT_INPARAMS_NULL;
              LOG_E(" umqtt decode inparams null!");
              goto exit;
          }
      
          message->header.byte = umqtt_readChar(&curdata);
          type = message->header.bits.type;
      
          switch (type)
          {
          case UMQTT_TYPE_CONNACK:
              _ret = umqtt_connack_decode(&(message->msg.connack), recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PUBLISH:
              _ret = umqtt_publish_decode(message, recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PUBACK:
              _ret = umqtt_puback_decode(message, recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PUBREC:
              // _ret = umqtt_pubrec_decode();
              break;
          case UMQTT_TYPE_PUBREL:
              // _ret = umqtt_pubrel_decode();
              break;
          case UMQTT_TYPE_PUBCOMP:
              // _ret = umqtt_pubcomp_decode();
              break;
          case UMQTT_TYPE_SUBACK:
              _ret = umqtt_suback_decode(&(message->msg.suback), recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_UNSUBACK:
              _ret = umqtt_unsuback_decode(&(message->msg.unsuback), recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PINGRESP:
              // _ret = umqtt_pingresp_encode();
              break;
          default:
              break;
          }
      exit:
          return _ret;
      }copymistakeCopy Success

      There are many types of messages, but we will only take the CONNECT message as an example:

      static int umqtt_connack_decode(struct umqtt_pkgs_connack *connack_msg, rt_uint8_t* buf, int buflen)
      {
          MQTTHeader header = {0};
          unsigned char* curdata = buf;
          unsigned char* enddata = NULL;
          int rc = 0;
          int mylen;
      
          header.byte = umqtt_readChar(&curdata);
          if (header.bits.type != UMQTT_TYPE_CONNACK)
          {
              rc = UMQTT_FAILED;
              LOG_E(" not connack type!");
              goto exit;
          }
      
          curdata += (rc = umqtt_pkgs_decodeBuf(curdata, &mylen)); /* read remaining length */
          enddata = curdata + mylen;
          if (enddata - curdata < 2)
          {
              LOG_D(" enddata:%d, curdata:%d, mylen:%d", enddata, curdata, mylen);
              goto exit;
          }
      
          connack_msg->connack_flags.connack_sign = umqtt_readChar(&curdata);
          connack_msg->ret_code = umqtt_readChar(&curdata);
      exit:
          return rc;
      }
      -----------------------------------------------------------------------------------------
      union umqtt_pkgs_connack_sign
      {
          rt_uint8_t connack_sign;
          struct {
              rt_uint8_t sp:             1;               /* current session bit */ 
              rt_uint8_t reserved:       7;               /* retain bit */ 
          } bits;
      };
      struct umqtt_pkgs_connack
      {
          /* variable header */ 
          union umqtt_pkgs_connack_sign connack_flags;    /* connect flags */ 
          enum umqtt_connack_retcode ret_code;            /* connect return code */ 
          /* payload = NULL */ 
      };copymistakeCopy Success

      It is still the familiar routine: read Fixed header → read Remaining length → read Variable header to parse related flags

  5. UMQTT_TYPE_CONNACK:

    Call set_uplink_recon_tick(client, UPLINK_NEXT_TICK)the function to set the next reconnection tick value, and call set_connect_status(client, UMQTT_CS_LINKED)the function to set the uMQTT client state 已连接.

At this point, the sending and receiving process of the CONNECT message has been completed. The next step is to start umqtt_threadthe thread and call umqtt_handle_readpacketthe function to process the data message received from the Broker server. The message processing process is similar to the above and will not be repeated. For the specific content and related processes, please refer to Figure 4-2:

Figure 4-2 MQTT communication flow chart

In summary, a flowchart is used to briefly describe some important function calls of uMQTT, as shown in Figure 4-3. Since many details are difficult to show, it is also necessary to understand its functional flow from the actual code.

Figure 4-3 umqtt important function flow chart

Last updated

Assoc. Prof. Wiroon Sriborrirux, Founder of Advance Innovation Center (AIC) and Bangsaen Design House (BDH), Electrical Engineering Department, Faculty of Engineering, Burapha University