MQTT-umqtt
Last updated
Last updated
Assoc. Prof. Wiroon Sriborrirux, Founder of Advance Innovation Center (AIC) and Bangsaen Design House (BDH), Electrical Engineering Department, Faculty of Engineering, Burapha University
MQTT is a machine-to-machine ( M2M
)/Internet of Things ( IoT
) connection protocol. Its full English name is " Message Queuing Telemetry Transport ", and its Chinese translation is " Message Queuing Telemetry Transport " protocol. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks. It is a "lightweight" communication protocol based on the 发布
/ 订阅
( publish
/ subscribe
) mode. The protocol is built on TCP/IP
the protocol and was released by IBM in 1999. At present, the latest version of the protocol is V5.0, and the commonly used version is V3.1.1.
Let's take an example to illustrate the actual application scenario of MQTT. As shown in Figure 1-1, it well shows a communication network system case based on the MQTT protocol:
Figure 1-1 Communication example based on MQTT
Definition of noun:
Publisher
Broker
Subscriber
Topic - the topic for publish/subscribe
Process Overview: In the figure above, the role of each type of sensor is publisher. For example, the humidity sensor and temperature sensor publish two topics named "Moisture" and "Temp" to the connected MQTT Broker (periodically); of course, along with the two topics, there are also humidity values and temperature values, which are called "messages". The role of several clients is subscriber. For example, if the mobile phone APP subscribes to the "Temp" topic from the Broker, the temperature value published by the temperature sensor in the Broker can be obtained on the mobile phone.
Additional notes:
The roles of publishers and subscribers are not fixed but relative. A publisher can also subscribe to topics from a broker at the same time, and a subscriber can publish topics to a broker. That is, a publisher can be a subscriber and a subscriber can be a publisher.
Broker can be an online cloud server or a local LAN client. According to the requirements, Broker itself will also include some functions of subscribing/publishing topics.
For more reference materials, please visit MQTT Chinese website or MQTT official website .
Learning objectives:
Understand the MQTT protocol and its message format
Understanding the relationship between uMQTT and LWIP
Understand the implementation principle of uMQTT
Any general/private protocol is composed of various message data packets that are pre-defined and constrained by certain rules, and MQTT is no exception. In the MQTT protocol, all data packets are composed of a maximum of three parts: 固定header
+ 可变header
+ 有效载荷
, as shown in Table 2-1:
Table 2-1 MQTT data packet format
Among them, the fixed header is required, and the variable header and payload are optional. Therefore, in theory, the minimum length of the MQTT protocol data packet is 2 bytes, which minimizes the extra resource consumption it occupies. Next, we will take MQTT3.1.1 as an example to introduce the detailed format of each part of the message (for detailed usage, see [**4. Implementation of uMQTT**](#4. Implementation of uMQTT)).
Table 2 - 2 Fixed header format
The fixed header consists of at least two bytes, as shown in Table 2-2. The high 4 bits of the first byte describe the type of the current data packet ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)), see Table 2-3 below; the low 4 bits define the flag bits related to the packet type ([Flags specific to each MQTT Control Packet type](#2.1.2 Flags specific to each MQTT Control Packet type)), see Table 2-4 below; the second and subsequent up to 4 bytes represent the byte length of the remaining data ([Remaining Length](#2.1.3 Remaining Length)), see Table 2-5 below.
Table 2-3 Control Packet type
Table 2-4 Control Packet Flags
Bits[3..0 ]** of Bytes[0] ** in the Fixed header contain the special identifiers for each MQTT control message type, as shown in Tables 2 - 4 above. Although the flags in the table are "Reserved", they are reserved for future versions, but they must be assigned the values in the table during data transmission. If an illegal flag that does not match the table is identified, the receiver must close the network connection.
Among them, three flags have been used in version 3.1.1:
DUP = Duplicate flag of control message [Duplicate]
QoS = Quality of Service for PUBLISH messages
RETAIN = PUBLISH message retention flag
Table 2-5 Remaining Length
The remaining length indicates the number of bytes in the remaining part of the current message, including the variable header and payload data. The remaining length does not include the number of bytes used to encode the remaining length field itself, that is, it does not include the length of the Fixed header.
The remaining length field uses a variable length encoding scheme, as shown in Table 2 - 5 above:
Values less than 128 are encoded using a single byte.
Larger values are handled as follows - the least significant 7 bits are used to encode the data, and the most significant bit is used to indicate whether there are more bytes. Therefore, each byte can encode 128 values and a continuation bit . The continuation bit only indicates the carry function and is not included in the calculation of the length.
This field can be up to 4 bytes.
Some MQTT control packets contain a variable header part, which is located between the fixed header and the payload. The content of the variable header varies depending on the control packet type ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)); for example, for a CONNECT type packet, its variable header contains four fields: "Protocol Name", "Protocol Level", "Connect Flags" and "Keep Alive". For other variable headers, see [MQTT Message Format](#3. MQTT Message Format).
Although there are various variable header contents, the Packet Identifier is an important universal field and has the following 唯一性
:
Table 2-6 Packet Identifier
This general field exists in many types of messages, see Table 2-7:
Table 2 - 7 Control Packets that contain a Packet Identifier
The rules for Packet Identifier are as follows:
SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in the case of QoS > 0) control packets MUST contain a non-zero 16-bit Message Identifier.
Each time a client sends a new packet of one of these types, it MUST assign it a Message Identifier that is not currently in use.
If a client resends a particular control packet, it MUST use the same Message Identifier in subsequent retransmissions of that packet. The Message Identifier can be reused after the client processes the ACK packet corresponding to that particular control packet.
The PUBLISH packet of QoS 1 corresponds to PUBACK.
The PUBLISH packet of QoS 2 corresponds to PUBCOMP.
SUBSCRIBE and UNSUBSCRIBE are corresponding to each other by SUBACK and UNSUBACK.
When the server sends a PUBLISH message with QoS>0, the above rules should also be followed.
A PUBLISH message with QoS=0 is not allowed to contain a message identifier.
The message identifier of the PUBACK, PUBREC, and PUBREL packets must be the same as the value originally sent in the PUBLISH packet.
Similarly, the SUBACK and UNSUBACK packets must have the same message identifier as the SUBSCRIBE and UNSUBSCRIBE packets.
The client and server assign message identifiers independently of each other. Therefore, a client-server combination using the same message identifier can achieve concurrent information exchange.
Some MQTT control messages contain a payload at the end of the packet. For details, see [MQTT message format](#3. MQTT message format). For example, for a PUBLISH message, the payload can be an application message. Table 2 - 8 lists the various control messages that require a payload.
Table 2-8 Control Packets that contain a Payload
Since there are many formats for Variable header and Payload, this article will not go into details. For details, please refer to the following links:
uMQTT software package is independently developed by RT-Thread, based on the client implementation of MQTT 3.1.1 protocol, which provides basic functions for devices to communicate with MQTT Broker. Click here for GitHub homepage and here for Gitee homepage .
The uMQTT package features are as follows:
Implement basic connection, subscription, and publishing functions;
It has multiple heartbeat keep-alive and device reconnection mechanisms to ensure the online status of MQTT and adapt to complex situations;
Supports three types of information quality: QoS=0, QoS=1, QoS=2;
Support multi-client use;
The user-side interface is simple and has multiple external callback functions.
Supports multiple configurable technical parameters, easy to use and convenient for product development;
It has powerful functions, low resource usage, and supports tailorable functions.
The uMQTT software package is mainly used to implement the MQTT protocol on embedded devices. The main work of the software package is based on the MQTT protocol. The software package structure diagram is shown in Figure 4-1:
Figure 4-1 uMQTT structure diagram
The main tasks in the software package implementation process are:
According to the MQTT 3.1.1 protocol, perform package data protocol encapsulation and unpacking;
The transport layer function is adapted to connect to the SAL (Socket Abstraction Layer) layer;
The uMQTT client layer writes an interface that conforms to the application layer based on the protocol packet layer and the transport layer. It implements basic connection, disconnection, subscription, unsubscription, message publishing and other functions. It supports QoS0/1/2 for sending information. It uses the uplink timer to implement multiple heartbeat keep-alive mechanisms and device reconnection mechanisms, increase the online stability of the device, and adapt to complex situations.
As shown in Figure 1-1, to connect to the Broker, the embedded device needs to be used as a client in the MQTT protocol.
In the uMQTT component umqtt.h
file, the MQTT configuration information used to initialize the client is abstracted to form the corresponding data structure:
These configuration information generally need to be filled in and specified before creating a uMQTT client, such as key information such as Broker's "URI", "user name" or "password". Other non-critical information, if not specified, will be umqtt_create
called in the client creation function umqtt_check_def_info
to assign default values:
However, with only the above information, it is not possible to run an MQTT client. Therefore umqtt.c
, in the structure containing umqtt_info
all umqtt_client
the data used to initialize the client:
The structure and enumeration type definitions of some of the above members can umqtt.h
be viewed in the file. The structure will be initialized after the client function is created umqtt_create
and called umqtt_check_def_info
:
Initialize the will data structure (if any)
Apply memory for the send and receive buffers
Create a mutex, message queue, and timeout reconnection timer (timeout callback to achieve reconnection + keep alive)
Initialize each linked list
Create umqtt_thread
- mqtt data sending and receiving thread
Returns mqtt_client
the structure address
When the value returned in step 6 is not empty, you can call the function to send a CONNECT message to connect to the Broker umqtt_start
through LWIP ; after the connection is successful , the thread will be started and the MQTT communication will be started.umqtt_thread
In umqtt_start
the function, the state of the uMQTT client is first set to UMQTT_CS_LINKING
, which means 正在连接中
. Next, the function is called umqtt_connect
to connect the local client to the Broker.
Connecting to a Broker is a two-step process:
Create a socket and establish a link connection with the Broker
Send a CONNECT message to create an MQTT protocol connection
In umqtt_connect
the function, umqtt_trans_connect
the first step is completed by calling the function:
This function is the core function for uMQTT to establish a connection with Broker through LWIP. And from the [uMQTT framework diagram](#**4.1. uMQTT structure framework**), we can know that this function uses SAL , the socket abstraction layer component, to call the relevant interface to access LWIP. Some of the functions encapsulated by the SAL component ( used to resolve the domain name getaddrinfo
in umqtt_resolve_uri
the function) are as follows:
When the uMQTT client successfully establishes a link layer connection with the Broker, it will immediately send a CONNECT message to establish an MQTT protocol layer connection.
The uMQTT component uses a clever structure + union to manage all sent and received messages:
The various message types of this structure correspond exactly to the various control message types in [Chapter 2.1.1](#2.1.1 MQTT Control Packet type) (PINGREQ and PINGRESP messages only need two bytes each, refer to here , so there is no need to use structures to manage them). umqtt_encode
Different packet assembly functions are called through functions, the structures of the corresponding formats are filled, and then sent to the Broker server:
The macro definition of the MQTT control packet type corresponds to [2.1.1 MQTT Control Packet type](#2.1.1 MQTT Control Packet type):
Since there are many types of messages, let's take the CONNECT message (variable header - "protocol name", "protocol level", "connection flag", "keep alive interval (seconds)", payload - "client identifier", "will topic", "will message", "user name", "password") as an example to briefly describe the package assembly process of uMQTT:
Fill in the default configuration information of the MQTT client
Call umqtt_encode
→ umqtt_connect_encode
encoding function (only encapsulated MQTTSerialize_connect
) package:
The function is first called MQTTSerialize_connectLength
to calculate the length of the variable header and payload . The obtained len will be passed to umqtt_pkgs_len
the function as a parameter. Its function is to calculate the number of bytes of the field in the fixed header and add the length of the first byte of the fixed header , which is 1, and compare it with buflen to determine the validity of the packet data.剩余长度
Why is this kind of compound statement used here
if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen)
?Because the len length we want to get is the value in the fixed header
剩余长度
to facilitate the subsequent packet assembly process, and the effective message length buflen = len + 1 +剩余长度
the number of bytes in the field; if the message length is calculated directly, when the value is written later剩余长度
, it is also necessary to subtract its own byte length and the first byte length of the fixed header, which is 1, which is more complicated and cumbersome.
Here are some important structures and unions to correspond to the MQTT protocol:
Fixed header
Please refer to [**2.1. Fixed header**](#**2.1. Fixed header**)
CONNECT flag
After the above package assembly process is completed, umqtt_trans_send
the function will be called to send the send buffer data to the Broker connected to the socket through LWIP:
When uMQTT completes sending the CONNECT message, it calls umqtt_handle_readpacket
the function (after completing the CONNECT process, the function will also umqtt_thread
be called cyclically in the thread to send and receive data) to read the Broker's reply and unpack the received data:
Briefly describe the above key steps:
Read the first byte of the Fixed header
Here the function is called umqtt_trans_recv
to read the socket data:
sal_recvfrom
It can be seen that this function is actually a wrapper for the SAL layer function, which is used to read data of length buf_len from the corresponding sock to recv_buf.
Read the Remaining length field of the Fixed header and parse the remaining length
I will not go into details here. The algorithm for this part refers to the rules in [2.1.3 Remaining Length](#2.1.3 Remaining Length).
Read remaining data - variable header + payload
No more details.
Parse data packets and process them accordingly according to different message types
Here is a key structure and unpacking function:
For structure members, please refer to **[4.4. uMQTT sends a package](#4.4. uMQTT sends a package)**
There are many types of messages, but we will only take the CONNECT message as an example:
It is still the familiar routine: read Fixed header → read Remaining length → read Variable header to parse related flags
UMQTT_TYPE_CONNACK:
Call set_uplink_recon_tick(client, UPLINK_NEXT_TICK)
the function to set the next reconnection tick value, and call set_connect_status(client, UMQTT_CS_LINKED)
the function to set the uMQTT client state 已连接
.
At this point, the sending and receiving process of the CONNECT message has been completed. The next step is to start umqtt_thread
the thread and call umqtt_handle_readpacket
the function to process the data message received from the Broker server. The message processing process is similar to the above and will not be repeated. For the specific content and related processes, please refer to Figure 4-2:
Figure 4-2 MQTT communication flow chart
In summary, a flowchart is used to briefly describe some important function calls of uMQTT, as shown in Figure 4-3. Since many details are difficult to show, it is also necessary to understand its functional flow from the actual code.
Figure 4-3 umqtt important function flow chart
[Fixed header](#2.1. Fixed header)(at least 2 Bytes)
[Variable header](#2.2. Variable header)
[Payload](#2.3. Payload*)
Present in all MQTT control packets
Exists in some MQTT control packets
Exists in some MQTT control packets
Bytes[0] Bytes[1]...
... ...
...Bytes[N-1] Bytes[N]
Bit
7
6
5
4
3
2
1
0
Bytes[0]
MQTT Control Packet type
MQTT Control Packet Type Flags (Flags specific to each MQTT Control Packet type)
Bytes[1]...
Remaining Length
name
value
Bytes[0]
describe
Reserved
0
0x0*
Reserved seat
CONNECT
1
0x1*
Client requests connection
CONNACK
2
0x2*
Server connection confirmation
PUBLISH
3
0x3*
Publish a message
PUBACK
4
0x4*
Release Confirmation (QoS1)
PUBREC
5
0x5*
Publish Received (QoS2 - Guaranteed Delivery Part 1)
PUBREL
6
0x6*
Release Release (QoS2 - Guaranteed Delivery Part 2)
PUBCOMP
7
0x7*
Release Complete (QoS2 - Guaranteed Delivery Part 3)
SUBSCRIBE
8
0x8*
Client subscription request
SUBACK
9
0x9*
Server subscription confirmation
UNSUBSCRIBE
10
0xA*
Client unsubscribe request
UNSUBACK
11
0xB*
Server-side unsubscribe confirmation
PINGREQ
12
0xC*
Client heartbeat (PING) request
PINGRESP
13
0xD*
Server heartbeat (PING) response
DISCONNECT
14
0xE*
Client is about to disconnect
Reserved
15
0xF*
Reserved seat
Control message type
Fixed header flags
Bit 3
Bit 2
Bit 1
Bit 0
CONNECT
Reserved
0
0
0
0
CONNACK
Reserved
0
0
0
0
PUBLISH
Used in MQTT 3.1.1
DUP
QoS
QoS
RETAIN
PUBACK
Reserved
0
0
0
0
PUBREC
Reserved
0
0
0
0
PUBREL
Reserved
0
0
1
0
PUBCOMP
Reserved
0
0
0
0
SUBSCRIBE
Reserved
0
0
1
0
SUBACK
Reserved
0
0
0
0
UNSUBSCRIBE
Reserved
0
0
1
0
UNSUBACK
Reserved
0
0
0
0
PINGREQ
Reserved
0
0
0
0
PINGRESP
Reserved
0
0
0
0
DISCONNECT
Reserved
0
0
0
0
Number of bytes
from (Bytes[1], Bytes[2], Bytes[3], Bytes[4])
Stop(Bytes[1], Bytes[2], Bytes[3], Bytes[4])
1
0 (0x00)
127 (0x7F)
2
128 (0x80, 0x01)
16 383 (0xFF, 0x7F)
3
16 384 (0x80, 0x80, 0x01)
2 097 151 (0xFF, 0xFF, 0x7F)
4
2 097 152 (0x80, 0x80, 0x80, 0x01)
268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)
byte
value
byte 1
Message Identifier MSB
byte 2
Message Identifier LSB
Message Type
Is there a message identifier?
CONNECT
NO
CONNACK
NO
PUBLISH
YES (If QoS > 0)
PUBACK
YES
PUBREC
YES
PUBREL
YES
PUBCOMP
YES
SUBSCRIBE
YES
SUBACK
YES
UNSUBSCRIBE
YES
UNSUBACK
YES
PINGREQ
NO
PINGRESP
NO
DISCONNECT
NO
Control message
Payload
CONNECT
Required
CONNACK
None
PUBLISH
Optional
PUBACK
None
PUBREC
None
PUBREL
None
PUBCOMP
None
SUBSCRIBE
Required
SUBACK
Required
UNSUBSCRIBE
Required
UNSUBACK
None
PINGREQ
None
PINGRESP
None
DISCONNECT
None
Bit
7
6
5
4
3
2
1
0
describe
User Name Flag
Password Flag
Will Retain
Will QoS
Will Flag
Clean Session
Reserved
byte 8
X
X
X
X
X
X
X
0