MQTT is a machine-to-machine ( M2M)/Internet of Things ( IoT) connection protocol. Its full English name is " Message Queuing Telemetry Transport ", and its Chinese translation is " Message Queuing Telemetry Transport " protocol. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks. It is a "lightweight" communication protocol based on the 发布/ 订阅( publish/ subscribe) mode. The protocol is built on TCP/IPthe protocol and was released by IBM in 1999. At present, the latest version of the protocol is V5.0, and the commonly used version is V3.1.1.
Let's take an example to illustrate the actual application scenario of MQTT. As shown in Figure 1-1, it well shows a communication network system case based on the MQTT protocol:
Figure 1-1 Communication example based on MQTT
Definition of noun:
Publisher
Broker
Subscriber
Topic - the topic for publish/subscribe
Process Overview: In the figure above, the role of each type of sensor is publisher. For example, the humidity sensor and temperature sensor publish two topics named "Moisture" and "Temp" to the connected MQTT Broker (periodically); of course, along with the two topics, there are also humidity values and temperature values, which are called "messages". The role of several clients is subscriber. For example, if the mobile phone APP subscribes to the "Temp" topic from the Broker, the temperature value published by the temperature sensor in the Broker can be obtained on the mobile phone.
Additional notes:
The roles of publishers and subscribers are not fixed but relative. A publisher can also subscribe to topics from a broker at the same time, and a subscriber can publish topics to a broker. That is, a publisher can be a subscriber and a subscriber can be a publisher.
Broker can be an online cloud server or a local LAN client. According to the requirements, Broker itself will also include some functions of subscribing/publishing topics.
Any general/private protocol is composed of various message data packets that are pre-defined and constrained by certain rules, and MQTT is no exception. In the MQTT protocol, all data packets are composed of a maximum of three parts: 固定header+ 可变header+ 有效载荷, as shown in Table 2-1:
[Fixed header](#2.1. Fixed header)(at least 2 Bytes)
[Variable header](#2.2. Variable header)
[Payload](#2.3. Payload*)
Present in all MQTT control packets
Exists in some MQTT control packets
Exists in some MQTT control packets
Bytes[0] Bytes[1]...
... ...
...Bytes[N-1] Bytes[N]
Table 2-1 MQTT data packet format
Among them, the fixed header is required, and the variable header and payload are optional. Therefore, in theory, the minimum length of the MQTT protocol data packet is 2 bytes, which minimizes the extra resource consumption it occupies. Next, we will take MQTT3.1.1 as an example to introduce the detailed format of each part of the message (for detailed usage, see [**4. Implementation of uMQTT**](#4. Implementation of uMQTT)).
MQTT Control Packet Type Flags (Flags specific to each MQTT Control Packet type)
Bytes[1]...
Remaining Length
Table 2 - 2 Fixed header format
The fixed header consists of at least two bytes, as shown in Table 2-2. The high 4 bits of the first byte describe the type of the current data packet ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)), see Table 2-3 below; the low 4 bits define the flag bits related to the packet type ([Flags specific to each MQTT Control Packet type](#2.1.2 Flags specific to each MQTT Control Packet type)), see Table 2-4 below; the second and subsequent up to 4 bytes represent the byte length of the remaining data ([Remaining Length](#2.1.3 Remaining Length)), see Table 2-5 below.
Bits[3..0 ]** of Bytes[0] ** in the Fixed header contain the special identifiers for each MQTT control message type, as shown in Tables 2 - 4 above. Although the flags in the table are "Reserved", they are reserved for future versions, but they must be assigned the values in the table during data transmission. If an illegal flag that does not match the table is identified, the receiver must close the network connection.
Among them, three flags have been used in version 3.1.1:
DUP = Duplicate flag of control message [Duplicate]
The remaining length indicates the number of bytes in the remaining part of the current message, including the variable header and payload data. The remaining length does not include the number of bytes used to encode the remaining length field itself, that is, it does not include the length of the Fixed header.
The remaining length field uses a variable length encoding scheme, as shown in Table 2 - 5 above:
Values less than 128 are encoded using a single byte.
Larger values are handled as follows - the least significant 7 bits are used to encode the data, and the most significant bit is used to indicate whether there are more bytes. Therefore, each byte can encode 128 values and a continuation bit . The continuation bit only indicates the carry function and is not included in the calculation of the length.
Some MQTT control packets contain a variable header part, which is located between the fixed header and the payload. The content of the variable header varies depending on the control packet type ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)); for example, for a CONNECT type packet, its variable header contains four fields: "Protocol Name", "Protocol Level", "Connect Flags" and "Keep Alive". For other variable headers, see [MQTT Message Format](#3. MQTT Message Format).
Although there are various variable header contents, the Packet Identifier is an important universal field and has the following 唯一性:
byte
value
byte 1
Message Identifier MSB
byte 2
Message Identifier LSB
Table 2-6 Packet Identifier
This general field exists in many types of messages, see Table 2-7:
Message Type
Is there a message identifier?
CONNECT
NO
CONNACK
NO
PUBLISH
YES (If QoS > 0)
PUBACK
YES
PUBREC
YES
PUBREL
YES
PUBCOMP
YES
SUBSCRIBE
YES
SUBACK
YES
UNSUBSCRIBE
YES
UNSUBACK
YES
PINGREQ
NO
PINGRESP
NO
DISCONNECT
NO
Table 2 - 7 Control Packets that contain a Packet Identifier
The rules for Packet Identifier are as follows:
SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in the case of QoS > 0) control packets MUST contain a non-zero 16-bit Message Identifier.
Each time a client sends a new packet of one of these types, it MUST assign it a Message Identifier that is not currently in use.
If a client resends a particular control packet, it MUST use the same Message Identifier in subsequent retransmissions of that packet. The Message Identifier can be reused after the client processes the ACK packet corresponding to that particular control packet.
The PUBLISH packet of QoS 1 corresponds to PUBACK.
The PUBLISH packet of QoS 2 corresponds to PUBCOMP.
SUBSCRIBE and UNSUBSCRIBE are corresponding to each other by SUBACK and UNSUBACK.
When the server sends a PUBLISH message with QoS>0, the above rules should also be followed.
A PUBLISH message with QoS=0 is not allowed to contain a message identifier.
The message identifier of the PUBACK, PUBREC, and PUBREL packets must be the same as the value originally sent in the PUBLISH packet.
Similarly, the SUBACK and UNSUBACK packets must have the same message identifier as the SUBSCRIBE and UNSUBSCRIBE packets.
The client and server assign message identifiers independently of each other. Therefore, a client-server combination using the same message identifier can achieve concurrent information exchange.
Some MQTT control messages contain a payload at the end of the packet. For details, see [MQTT message format](#3. MQTT message format). For example, for a PUBLISH message, the payload can be an application message. Table 2 - 8 lists the various control messages that require a payload.
uMQTT software package is independently developed by RT-Thread, based on the client implementation of MQTT 3.1.1 protocol, which provides basic functions for devices to communicate with MQTT Broker. Click here for GitHub homepage and here for Gitee homepage .
The uMQTT package features are as follows:
Implement basic connection, subscription, and publishing functions;
It has multiple heartbeat keep-alive and device reconnection mechanisms to ensure the online status of MQTT and adapt to complex situations;
Supports three types of information quality: QoS=0, QoS=1, QoS=2;
Support multi-client use;
The user-side interface is simple and has multiple external callback functions.
Supports multiple configurable technical parameters, easy to use and convenient for product development;
It has powerful functions, low resource usage, and supports tailorable functions.
The uMQTT software package is mainly used to implement the MQTT protocol on embedded devices. The main work of the software package is based on the MQTT protocol. The software package structure diagram is shown in Figure 4-1:
Figure 4-1 uMQTT structure diagram
The main tasks in the software package implementation process are:
According to the MQTT 3.1.1 protocol, perform package data protocol encapsulation and unpacking;
The transport layer function is adapted to connect to the SAL (Socket Abstraction Layer) layer;
The uMQTT client layer writes an interface that conforms to the application layer based on the protocol packet layer and the transport layer. It implements basic connection, disconnection, subscription, unsubscription, message publishing and other functions. It supports QoS0/1/2 for sending information. It uses the uplink timer to implement multiple heartbeat keep-alive mechanisms and device reconnection mechanisms, increase the online stability of the device, and adapt to complex situations.
As shown in Figure 1-1, to connect to the Broker, the embedded device needs to be used as a client in the MQTT protocol.
In the uMQTT component umqtt.hfile, the MQTT configuration information used to initialize the client is abstracted to form the corresponding data structure:
These configuration information generally need to be filled in and specified before creating a uMQTT client, such as key information such as Broker's "URI", "user name" or "password". Other non-critical information, if not specified, will be umqtt_createcalled in the client creation function umqtt_check_def_infoto assign default values:
static void umqtt_check_def_info(struct umqtt_info *info)
{
if (info)
{
if (info->send_size == 0) { info->send_size = PKG_UMQTT_INFO_DEF_SENDSIZE; }
if (info->recv_size == 0) { info->recv_size = PKG_UMQTT_INFO_DEF_RECVSIZE; }
if (info->reconnect_max_num == 0) { info->reconnect_max_num = PKG_UMQTT_INFO_DEF_RECONNECT_MAX_NUM; }
if (info->reconnect_interval == 0) { info->reconnect_interval = PKG_UMQTT_INFO_DEF_RECONNECT_INTERVAL; }
if (info->keepalive_max_num == 0) { info->keepalive_max_num = PKG_UMQTT_INFO_DEF_KEEPALIVE_MAX_NUM; }
if (info->keepalive_interval == 0) { info->keepalive_interval = PKG_UMQTT_INFO_DEF_HEARTBEAT_INTERVAL; }
if (info->connect_time == 0) { info->connect_time = PKG_UMQTT_INFO_DEF_CONNECT_TIMEOUT; }
if (info->recv_time_ms == 0) { info->recv_time_ms = PKG_UMQTT_INFO_DEF_RECV_TIMEOUT_MS; }
if (info->send_timeout == 0) { info->send_timeout = PKG_UMQTT_INFO_DEF_SEND_TIMEOUT; }
if (info->thread_stack_size == 0) { info->thread_stack_size = PKG_UMQTT_INFO_DEF_THREAD_STACK_SIZE; }
if (info->thread_priority == 0) { info->thread_priority = PKG_UMQTT_INFO_DEF_THREAD_PRIORITY; }
}
}copymistakeCopy Success
However, with only the above information, it is not possible to run an MQTT client. Therefore umqtt.c, in the structure containing umqtt_infoall umqtt_clientthe data used to initialize the client:
The structure and enumeration type definitions of some of the above members can umqtt.hbe viewed in the file. The structure will be initialized after the client function is created umqtt_createand called umqtt_check_def_info:
Initialize the will data structure (if any)
Apply memory for the send and receive buffers
Create a mutex, message queue, and timeout reconnection timer (timeout callback to achieve reconnection + keep alive)
Initialize each linked list
Create umqtt_thread- mqtt data sending and receiving thread
Returns mqtt_clientthe structure address
When the value returned in step 6 is not empty, you can call the function to send a CONNECT message to connect to the Broker umqtt_startthrough LWIP ; after the connection is successful , the thread will be started and the MQTT communication will be started.umqtt_thread
In umqtt_startthe function, the state of the uMQTT client is first set to UMQTT_CS_LINKING, which means 正在连接中. Next, the function is called umqtt_connectto connect the local client to the Broker.
Connecting to a Broker is a two-step process:
Create a socket and establish a link connection with the Broker
Send a CONNECT message to create an MQTT protocol connection
In umqtt_connectthe function, umqtt_trans_connectthe first step is completed by calling the function:
/**
* TCP/TLS Connection Complete for configured transport
*
* @param uri the input server URI address
* @param sock the output socket
*
* @return <0: failed or other error
* =0: success
*/
int umqtt_trans_connect(const char *uri, int *sock)
{
int _ret = 0;
struct addrinfo *addr_res = RT_NULL;
*sock = -1;
/* 域名解析 */
_ret = umqtt_resolve_uri(uri, &addr_res);
if ((_ret < 0) || (addr_res == RT_NULL))
{
LOG_E("resolve uri err");
_ret = UMQTT_FAILED;
goto exit;
}
/* 创建套接字 */
if ((*sock = socket(addr_res->ai_family, SOCK_STREAM, UMQTT_SOCKET_PROTOCOL)) < 0)
{
LOG_E("create socket error!");
_ret = UMQTT_FAILED;
goto exit;
}
/* 设置套接字工作在非阻塞模式下 */
_ret = ioctlsocket(*sock, FIONBIO, 0);
if (_ret < 0)
{
LOG_E(" iocontrol socket error!");
_ret = UMQTT_FAILED;
goto exit;
}
/* 建立连接 */
if ((_ret = connect(*sock, addr_res->ai_addr, addr_res->ai_addrlen)) < 0)
{
LOG_E(" connect err!");
closesocket(*sock);
*sock = -1;
_ret = UMQTT_FAILED;
goto exit;
}
exit:
if (addr_res) {
freeaddrinfo(addr_res);
addr_res = RT_NULL;
}
return _ret;
}copymistakeCopy Success
This function is the core function for uMQTT to establish a connection with Broker through LWIP. And from the [uMQTT framework diagram](#**4.1. uMQTT structure framework**), we can know that this function uses SAL , the socket abstraction layer component, to call the relevant interface to access LWIP. Some of the functions encapsulated by the SAL component ( used to resolve the domain name getaddrinfoin umqtt_resolve_urithe function) are as follows:
When the uMQTT client successfully establishes a link layer connection with the Broker, it will immediately send a CONNECT message to establish an MQTT protocol layer connection.
The uMQTT component uses a clever structure + union to manage all sent and received messages:
The various message types of this structure correspond exactly to the various control message types in [Chapter 2.1.1](#2.1.1 MQTT Control Packet type) (PINGREQ and PINGRESP messages only need two bytes each, refer to here , so there is no need to use structures to manage them). umqtt_encodeDifferent packet assembly functions are called through functions, the structures of the corresponding formats are filled, and then sent to the Broker server:
/**
* packaging the data according to the format
*
* @param type the input packaging type
* @param send_buf the output send buf, result of the package
* @param send_len the output send buffer length
* @param message the input message
*
* @return <=0: failed or other error
* >0: package data length
*/
int umqtt_encode(enum umqtt_type type, rt_uint8_t *send_buf, size_t send_len, struct umqtt_msg *message)
{
int _ret = 0;
switch (type)
{
case UMQTT_TYPE_CONNECT:
_ret = umqtt_connect_encode(send_buf, send_len, &(message->msg.connect));
break;
case UMQTT_TYPE_PUBLISH:
_ret = umqtt_publish_encode(send_buf, send_len, message->header.bits.dup, message->header.bits.qos, &(message->msg.publish));
break;
case UMQTT_TYPE_PUBACK:
_ret = umqtt_puback_encode(send_buf, send_len, message->msg.puback.packet_id);
break;
case UMQTT_TYPE_PUBREC:
// _ret = umqtt_pubrec_encode();
break;
case UMQTT_TYPE_PUBREL:
_ret = umqtt_pubrel_encode(send_buf, send_len, message->header.bits.dup, message->msg.pubrel.packet_id);
break;
case UMQTT_TYPE_PUBCOMP:
_ret = umqtt_pubcomp_encode(send_buf, send_len, message->msg.pubcomp.packet_id);
break;
case UMQTT_TYPE_SUBSCRIBE:
_ret = umqtt_subscribe_encode(send_buf, send_len, &(message->msg.subscribe));
break;
case UMQTT_TYPE_UNSUBSCRIBE:
_ret = umqtt_unsubscribe_encode(send_buf, send_len, &(message->msg.unsubscribe));
break;
case UMQTT_TYPE_PINGREQ:
_ret = umqtt_pingreq_encode(send_buf, send_len);
break;
case UMQTT_TYPE_DISCONNECT:
_ret = umqtt_disconnect_encode(send_buf, send_len);
break;
default:
break;
}
return _ret;
}
copymistakeCopy Success
The macro definition of the MQTT control packet type corresponds to [2.1.1 MQTT Control Packet type](#2.1.1 MQTT Control Packet type):
Since there are many types of messages, let's take the CONNECT message (variable header - "protocol name", "protocol level", "connection flag", "keep alive interval (seconds)", payload - "client identifier", "will topic", "will message", "user name", "password") as an example to briefly describe the package assembly process of uMQTT:
Fill in the default configuration information of the MQTT client
The function is first called MQTTSerialize_connectLengthto calculate the length of the variable header and payload . The obtained len will be passed to umqtt_pkgs_lenthe function as a parameter. Its function is to calculate the number of bytes of the field in the fixed header and add the length of the first byte of the fixed header , which is 1, and compare it with buflen to determine the validity of the packet data.剩余长度
Why is this kind of compound statement used here if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen) ?
Because the len length we want to get is the value in the fixed header剩余长度 to facilitate the subsequent packet assembly process, and the effective message length buflen = len + 1 + 剩余长度the number of bytes in the field; if the message length is calculated directly, when the value is written later 剩余长度, it is also necessary to subtract its own byte length and the first byte length of the fixed header, which is 1, which is more complicated and cumbersome.
Here are some important structures and unions to correspond to the MQTT protocol:
Fixed header
Please refer to [**2.1. Fixed header**](#**2.1. Fixed header**)
union umqtt_pkgs_connect_sign
{
rt_uint8_t connect_sign;
struct {
rt_uint8_t reserved: 1; /* reserved bits */
rt_uint8_t clean_session: 1; /* clean session bit */
rt_uint8_t will_flag: 1; /* will flag bit */
rt_uint8_t will_Qos: 2; /* will Qos bit */
rt_uint8_t will_retain: 1; /* will retain bit */
rt_uint8_t password_flag: 1; /* password flag bit */
rt_uint8_t username_flag: 1; /* user name flag bit */
} bits;
};
-------------------------------------------------------------------------------
#define UMQTT_SET_CONNECT_FLAGS(user_name_flag, password_flag, will_retain, will_qos, will_flag, clean_session, reserved) \
(((user_name_flag & 0x01) << 7) | \
((password_flag & 0x01) << 6) | \
((will_retain & 0x01) << 5) | \
((will_qos & 0x01) << 3) | \
((will_flag & 0x01) << 2) | \
((clean_session & 0x01) << 1) | \
(reserved & 0x01))
#define UMQTT_DEF_CONNECT_FLAGS (UMQTT_SET_CONNECT_FLAGS(0,0,0,0,0,1,0))copymistakeCopy Success
After the above package assembly process is completed, umqtt_trans_sendthe function will be called to send the send buffer data to the Broker connected to the socket through LWIP:
/**
* TCP/TLS send datas on configured transport.
*
* @param sock the input socket
* @param send_buf the input, transport datas buffer
* @param buf_len the input, transport datas buffer length
* @param timeout the input, tcp/tls transport timeout
*
* @return <0: failed or other error
* =0: success
*/
int umqtt_trans_send(int sock, const rt_uint8_t *send_buf, rt_uint32_t buf_len, int timeout)
{
int _ret = 0;
rt_uint32_t offset = 0U;
while (offset < buf_len)
{
_ret = send(sock, send_buf + offset, buf_len - offset, 0);
if (_ret < 0)
return -errno;
offset += _ret;
}
return _ret;
}copymistakeCopy Success
When uMQTT completes sending the CONNECT message, it calls umqtt_handle_readpacketthe function (after completing the CONNECT process, the function will also umqtt_threadbe called cyclically in the thread to send and receive data) to read the Broker's reply and unpack the received data:
sal_recvfromIt can be seen that this function is actually a wrapper for the SAL layer function, which is used to read data of length buf_len from the corresponding sock to recv_buf.
Read the Remaining length field of the Fixed header and parse the remaining length
I will not go into details here. The algorithm for this part refers to the rules in [2.1.3 Remaining Length](#2.1.3 Remaining Length).
Read remaining data - variable header + payload
No more details.
Parse data packets and process them accordingly according to different message types
For structure members, please refer to **[4.4. uMQTT sends a package](#4.4. uMQTT sends a package)**
/**
* parse the data according to the format
*
* @param recv_buf the input, the raw buffer data, of the correct length determined by the remaining length field
* @param recv_buf_len the input, the length in bytes of the data in the supplied buffer
* @param message the output datas
*
* @return <0: failed or other error
* =0: success
*/
int umqtt_decode(rt_uint8_t *recv_buf, size_t recv_buf_len, struct umqtt_msg *message)
{
int _ret = 0;
rt_uint8_t* curdata = recv_buf;
enum umqtt_type type;
if (message == RT_NULL)
{
_ret = UMQTT_INPARAMS_NULL;
LOG_E(" umqtt decode inparams null!");
goto exit;
}
message->header.byte = umqtt_readChar(&curdata);
type = message->header.bits.type;
switch (type)
{
case UMQTT_TYPE_CONNACK:
_ret = umqtt_connack_decode(&(message->msg.connack), recv_buf, recv_buf_len);
break;
case UMQTT_TYPE_PUBLISH:
_ret = umqtt_publish_decode(message, recv_buf, recv_buf_len);
break;
case UMQTT_TYPE_PUBACK:
_ret = umqtt_puback_decode(message, recv_buf, recv_buf_len);
break;
case UMQTT_TYPE_PUBREC:
// _ret = umqtt_pubrec_decode();
break;
case UMQTT_TYPE_PUBREL:
// _ret = umqtt_pubrel_decode();
break;
case UMQTT_TYPE_PUBCOMP:
// _ret = umqtt_pubcomp_decode();
break;
case UMQTT_TYPE_SUBACK:
_ret = umqtt_suback_decode(&(message->msg.suback), recv_buf, recv_buf_len);
break;
case UMQTT_TYPE_UNSUBACK:
_ret = umqtt_unsuback_decode(&(message->msg.unsuback), recv_buf, recv_buf_len);
break;
case UMQTT_TYPE_PINGRESP:
// _ret = umqtt_pingresp_encode();
break;
default:
break;
}
exit:
return _ret;
}copymistakeCopy Success
There are many types of messages, but we will only take the CONNECT message as an example:
It is still the familiar routine: read Fixed header → read Remaining length → read Variable header to parse related flags
UMQTT_TYPE_CONNACK:
Call set_uplink_recon_tick(client, UPLINK_NEXT_TICK)the function to set the next reconnection tick value, and call set_connect_status(client, UMQTT_CS_LINKED)the function to set the uMQTT client state 已连接.
At this point, the sending and receiving process of the CONNECT message has been completed. The next step is to start umqtt_threadthe thread and call umqtt_handle_readpacketthe function to process the data message received from the Broker server. The message processing process is similar to the above and will not be repeated. For the specific content and related processes, please refer to Figure 4-2:
In summary, a flowchart is used to briefly describe some important function calls of uMQTT, as shown in Figure 4-3. Since many details are difficult to show, it is also necessary to understand its functional flow from the actual code.