Advance Innovation Centre
  • AIC Knowledge @ EEC for All
  • 😎Logical Thinking
    • Karel Robot
    • Code to Flowchart
    • Play with Docker
    • CNX Software
  • MCU & Interfacing with Infineon PSOC™
    • Basic MCU Interfacing
      • Introduction to CY8CKIT-062S2-43012 Pioneer Kit
      • Development Environment Preparation
      • PSoC™ 6S2 Peripherals Interfacing (GPIO)
        • Hello World and LED Blinking
        • GPIO Principles
        • PSoC™ 6S2 GPIO-HAL LED Blink Lab
        • PSoC™ 6S2 GPIO-PDL LED Blink Lab
        • Button "Bounce" Principles
          • Push/Pull Button to Turn ON/OFF LED via HAL
          • Push/Pull Button to Turn ON/OFF LED via PDL
          • GPIO Button Interrupt via HAL
          • GPIO Button Interrupt via PDL
        • GPIO variables & functions
      • PSoC™ 6S2 Peripherals Interfacing (ADC, PWM)
        • PSoC™ 6S2 SAR ADC
          • ADC Principles
          • PSoC™ 6S2 with ADC Labs
            • Reading potentiometer sensor value via an ADC HAL
            • Reading potentiometer sensor value via an ADC PDL
        • PSoC™ 6S2 PWM & TCPWM
          • PWM Principles
          • PSoC™ 6S2 for PMW Function Labs
            • LED Brightness using PWM via HAL
            • LED Brightness using PWM via PDL
    • Sensor Interfacing and HMI
      • OLED Display
        • OLED Display Principles
        • Calling BDH’s OLED functions
        • Display ADC via Potentiometer on OLED
      • BDH Shell
        • Shell Principles
        • LED Blinking and CAPSENSE via BDH Shell
        • Adding "History" command
        • Adding "Reboot" command
        • CAPSENSE Button and Slider
          • CAPSENSE Button and Slider with Capsense Tuner
          • CAPSENSE Button and Slider using FreeRTOS
    • Serial Communication & Visualization
      • UART, I2C, SPI Communication via Infineon PSoC™6
      • BMX160 Sensor Communication via Infineon PSoC™6
        • Reading ADC via HAL with Potentiometer and Displaying GUI on Serial Studio
        • Reading XENSIV-DPS-3XX Pressure Sensor and Displaying GUI on Serial Studio
        • Motion Sensors GUI Integration via Serial Studio
    • IoT Connectivity & Data Analytics via Node-Red
      • Node-Red Installation
      • Setting MQTTS to MQTT Broker
      • Sending PSoC6’s sensor to MQTT (node-red)
    • Edge AI on PSoC™
      • Machine Learning on PSoC™6 via Edge-Impulse
    • Infineon PSoC™ Troubleshooting
  • IoT Development with Infineon PSOC™ & BDH Platform
    • PSoC™ IoT Development Kit
      • Introduction to CY8CKIT-062S2-43012 Pioneer Kit
      • Development Environment Preparation
        • Hello World and LED Blinking
    • IoT Connectivity
      • Node-Red Installation
      • Controlling PSoC™ LED using MQTT
      • Setting MQTTS to MQTT Broker
      • Sending PSoC6’s sensor to MQTT (node-red)
    • BDH IoT Connectivity
    • WireLinX™ IoT PLC
    • BDH X-Brain Data Analytics
      • PSoC6 Data Collection to CSV log file
    • Data Visualization
      • สร้าง Dashboard ด้วย Looker Studio
  • 🖥️Operation Systems
    • Prerequisites
      • Guideline from Ubuntu
        • Ubuntu and VSCode on WSL2
      • ติดตั้ง WSL 2
      • Run Ubuntu on VirtualBox7
    • Zero to Linux Hero
      • Computer OS Architecture
      • Anatomy of Linux System
        • UNIX/Linux History
        • UNIX/Linux Evolution
        • GNU Project
        • Linux OS Architecture
        • Command Line Interface (CLI)
          • Basic Commands
          • 😎Level up your Linux Shell
          • File & Dir. Commands
          • Searching Commands
          • 😎ChatGPT-based Terminal
          • SysAdmin Commands
          • Network Commands
          • Hacker Commands
        • Busybox
        • Shell Script
          • Awk Script
          • Bash Shell Script
            • Bash Snippets
            • Bash Useful Examples
      • Anatomy of Linux Kernel
        • Linux Kernel Principles
        • Linux Environment for Developer
      • Anatomy of Embedded Linux
        • Embedded Linux
        • Host & Target
        • Cross Toolchains
        • Bootloader
        • Building Embedded Linux
    • Linux OS Dev. Engineer
      • Process Management
        • Process Basic
        • Process State
        • Basic Process Mgmt. Commands
        • Advance Process Mgmt. Commands
        • Process API Programming
      • IPC
        • IPC Anatomy
        • Signal Programming
        • Pipe Programming
        • FIFO Programming
        • Msg. Queue Programming
          • System V
        • Share Memory Programming
          • System V
        • Socket Programming
      • POSIX Threads
        • Multi-tasking Basic
        • POSIX Thread Anatomy
        • Threading Programming
      • Applied IPC
        • Remote Commander
        • Multi-Remote Commanders
      • Process Synchronization
        • Mutex Programming
        • Semaphore Programming
      • Applied IPC with Semaphore
  • ⌚Embedded Systems Development
    • Introduction to ESD
      • Why's ESD?
      • What it use for?
      • How it works?
    • Enbedded System Development via PSoC6
      • Basic MCU Interfacing
        • Introduction to CY8CKIT-062S2-43012 Pioneer Kit
        • Development Environment Preparation
        • PSoC™ 6S2 Peripherals Interfacing (GPIO)
          • Hello World and LED Blinking
          • GPIO Principles
          • PSoC™ 6S2 GPIO-HAL LED Blink Lab
          • PSoC™ 6S2 GPIO-PDL LED Blink Lab
          • Button "Bounce" Principles
            • Push/Pull Button to Turn ON/OFF LED via HAL
            • Push/Pull Button to Turn ON/OFF LED via PDL
            • GPIO Button Interrupt via HAL
            • GPIO Button Interrupt via PDL
          • GPIO variables & functions
        • PSoC™ 6S2 Peripherals Interfacing (ADC, PWM)
          • PSoC™ 6S2 SAR ADC
            • ADC Principles
            • PSoC™ 6S2 with ADC Labs
              • Reading potentiometer sensor value via an ADC HAL
              • Reading potentiometer sensor value via an ADC PDL
          • PSoC™ 6S2 PWM & TCPWM
            • PWM Principles
            • PSoC™ 6S2 for PMW Function Labs
              • LED Brightness using PWM via HAL
              • LED Brightness using PWM via PDL
      • Sensor Interfacing and HMI
        • OLED Display
          • OLED Display Principles
          • Calling BDH’s OLED functions
          • Display ADC via Potentiometer on OLED
        • BDH Shell
          • Shell Principles
          • LED Blinking and CAPSENSE via BDH Shell
          • Adding "History" command
          • Adding "Reboot" command
          • CAPSENSE Button and Slider
            • CAPSENSE Button and Slider with Capsense Tuner
            • CAPSENSE Button and Slider using FreeRTOS
      • Serial Communication & Visualization
        • UART, I2C, SPI Communication via Infineon PSoC™6
        • BMX160 Sensor Communication via Infineon PSoC™6
          • Reading ADC via HAL with Potentiometer and Displaying GUI on Serial Studio
          • Reading XENSIV-DPS-3XX Pressure Sensor and Displaying GUI on Serial Studio
          • Motion Sensors GUI Integration via Serial Studio
    • Edge Computing and IoT Connectivity
    • Cloud-Based Data Analytics and Digital Twin
    • Edge Vision AI
    • Resources
      • Basic Hardware and Firmware
        • Environment Preparation
          • การติดตั้งโปรแกรม Arduino IDE
            • ตัวอย่างการเริ่มต้นใช้งาน Arduino IDE
          • การติดตั้งโปรแกรมสำหรับใช้งานเครื่องมือวัด NI MyDAQ
            • ตัวอย่างการตั้งค่าใช้ Digital Multimeter -NI ELVISmx
            • ตัวอย่างการตั้งค่าใช้ Oscilloscope-NI ELVISmx
          • ติดตั้งโปรแกรม KingstVIS
        • Basic measurement
          • Basic Digital and Analog I/O
            • LAB: Basic Digital Input/Output
            • LAB: Basic Analog Input/Output
          • Waveform
            • LAB: Oscilloscope
            • LAB: Oscilloscope and Function Generator
            • LAB: Pulse Width Modulation (PWM)
              • Homework
        • Interfacing and Communication
          • LAB: UART, RS485, RS232 Protocol
          • LAB: I2C Protocol
            • HOMEWORK
          • LAB: SPI Protocol
      • IoT Connectivity
        • Example: IoT with MQTT on Node-red
        • Data logger
        • LAB: Data Visualization
  • 🛠️C/C++ for Embedded Programming
    • Development Environment Preparation
      • ติดตั้ง WSL 2
      • ติดตั้ง Ubuntu environment
      • ติดตั้งโปรแกรม Visual Studio Code
      • การเชื่อมต่อ Virtual studio code เข้ากับ WSL
      • ติดตั้ง docker on WSL
    • Principle C/C++ Programming
      • Get started with C++
      • Makefile
        • Makefile Examples
      • Compiling and running
        • How to create a program that you can enter inputs.
          • Lab 1 Exercise
      • Arguments
        • Command line arguments in C and C++
      • signed and unsigned data types
      • Variable and Operator
      • If and If else
      • Loop, Infinite loop, and flag
        • Loop and Flag exercise
      • Array
        • Get to know with arrays
        • Implement example
      • Vector
    • Object Oriented Programming (OOP) in C++
      • Class and Object
      • Encapsulation and Abstraction
      • Polymorphism and Inheritance
    • C/C++ Preprocessing
      • Macro
        • Quiz Macro
      • File Inclusion
      • Conditional Compilation
      • Pragma directive
        • Quiz Pragma
    • String in C++
      • Concatenation
      • Split
    • Type conversions for C/C++
      • Conversion using Cast operator
    • Error handling
    • Data logger
      • การสร้างไฟล์และเขียนไฟล์
      • การอ่านไฟล์
      • การเก็บข้อมูลกับTime stamp
    • High performance programing
      • Multi-task and Multi-thread
        • Multi-threading example
      • Mutex
      • Queue
      • OpenCV
    • C/C++ Techniques
      • Makefile in action
      • Object Oriented Programming (OOP) in C++
        • Class and Object
        • Encapsulation and Abstraction
        • Polymorphism and Inheritance
      • C/C++ Preprocessing
        • Macro
          • Quiz Macro
        • File Inclusion
        • Conditional Compilation
        • Pragma directive
          • Quiz Pragma
      • Binary, Octal and Hexadecimal Numbers
      • Array and properties of an array
        • Get to know with arrays
        • Implement example
      • What's next?
  • 🤖Artificial Intelligence (AI)
    • VAMStack Design House, BUU
    • Data Analytics
      • Data cleansing
      • Data analytics
      • Data analytic exercise
    • Machine Learning
      • Neural Network Layers
      • Machine learning type
      • Dataset
      • Using Edge Impulse for AI Model
    • Basic Image Processing
      • Computer Vision using Python Language
        • Installation
        • Computer Vision Basics
          • Pixel and Color
          • Draw image
          • Basic Image processing
          • Morphology Transformations
          • Gaussian blur
          • Simple Thresholding
          • Contour
          • Canny edge detection
        • Case Study
          • Coin counting
          • Color detection & tracking
        • VAM_CV SDK
  • ⚙️FPGA Design and Development
    • Verilog HDL via Vivado IDE
      • LAB1: Setting Environment and Create Project
        • Create Vivado Project
      • LAB2: Hardware Description Language Work Flow
        • Simulation code
      • LAB3: Design HDL Project
        • Top Level Design
        • Top-level Simulation
      • LAB4: Asynchronous VS Synchronous Circuit
        • Simulation Synchronous counter
    • C/C++ Programming on Ultra96v2 FPGA Board
      • Application C/C++ on Ultra96v2 Part 1
        • Design Overview
        • Step 1 - Burn the image to SD card
        • Step 2 - Bring up Ultra96v2
        • Step 3 - Installing the Vitis-AI runtime packages
      • Application C/C++ on Ultra96v2 Part 2
        • STEP 1 : Setting auto boot Wifi
        • STEP 2 : How to working on Embedded
        • STEP 3 : How to run the test code
  • 🤖Robotics
    • Dobot Magician
      • Instruction of Dobot
      • Software Download
      • Basically of Program
        • Teaching and Playback
        • Write and Draw
        • LaserEngraving
        • 3D Printer
    • Robotino
      • Software Download
        • Robotino View
        • Robotino SIM
      • Charging
      • Connecting
      • Follow Line example
        • Basic block in Follow Line
    • RaspBlock
      • Get Started with Raspblock
  • 🚩Special Topics
    • Node-Red
      • Set up Raspberry Pi
      • Install node red in Raspberry Pi
      • Get started with Node Red
        • Open node-red
        • Turn off node red
        • Install Dashboard on Node-red
        • Use node red to show message
        • Using Ultrasonic sensor with node-red
    • IoT Cloud
      • Overview
        • How do they work?
          • Basic Knowlege
      • Installations
        • Install Docker
        • Install Mosquitto Broker
        • Install InfluxDB
        • Install Telegraf
        • Install Grafana
      • Get Sensor Value and Send to MQTT
        • Connect ESP3266 to sensor
        • Connect ESP3266 to MQTT
      • Integration
    • Senses IoT
      • SENSES IoT Platform
      • LAB8: MCU send data to IoT platform
    • CrowPi Dev Kit
      • Raspberry Pi with CrowPi
      • Remote to Raspberry Pi
      • Cross-Compile
        • Lab 1: Programming and cross complier
      • Hardware and Interfaces Usage CLI
        • LAB: Usage GPIO via CLI
        • LAB: Scan I2C bus via CLI
      • Python library for Crow Pi
      • wiringPi library (C) for CrowPi
        • Lab2: Crowpi and sensors
    • LVGL Development
      • LVGL - Light and Versatile Embedded Graphics Library
        • Setting program for LVGL Simulator
        • Get started with LVGL simulator
        • Example Library of LVGL
        • Create your own screen
          • Exercise
        • Style
          • Exercise
        • Event
    • Docker OS
      • Docker OS Part 1
        • Part 1 : Installation
        • Part 2 : Basic Docker OS and Linux CLI
      • Docker OS Part 2
        • Part 1 : Docker communication
        • Part 2 : Docker compose
      • Application Gstreamer on devcontainer
        • STEP 1 : Setting gstreamer environment
        • STEP 2 : Create the Gstreamer element on template
        • STEP 3 : Testing and application on your gst element
  • 🤟Recommended by AIC
    • Skill Roadmap
      • Embedded Engineer
      • Developer
    • Hardware Programming
    • Embedded Programming
    • General-propose Programming
    • Algorithmica
    • Thai Expert Knowledge
    • RT-Thread University Program
      • Infineon PSoC6
      • Kernel
        • Kernel Basics
        • Thread Management
        • Clock Management
        • Inter-thread synchronization
        • Inter-thread communication
        • Memory Management
        • Interrupt Management
        • Kernel porting
        • Atomic Operations
        • RT-Thread SMP
        • Kernel API Changelog
      • Tools
      • Devices & Drivers
        • SENSOR Devices
        • Touch Equipment
        • CRYPTO Devices
        • AUDIO Devices
        • Pulse Encoder Devices
      • Components
        • C Library (libc)
        • ISO/ANSI C Standard
        • POSIX Standard
          • FILE (File IO)
          • Pthread
          • Timer
          • IPC Semaphore
          • IPC Message Queues
          • Dynamic Modules
        • Network Components
          • FinSH Console
          • FAL: Flash Abstraction Layer
          • Virtual File System
          • tmpfs: temporary file system
          • ulog log
          • utest testing framework
          • Power Management
          • RT-Link
        • Software Packages
          • Internet of Things
            • MQTT-umqtt
            • Telnet
          • Tools
            • SystemView
            • SEGGER_RTT
          • LVGL Manual
            • Touch Screen Driver
      • Demo
        • Infineon Gateway
        • Handwriting Recognition (MNIST)
        • Object Detection (Darknet)
        • ROS using RT-Thread
        • Control the car using RT-Thread
        • LiDAR via RT-Thread
        • Detection via RT-Thread and ROS
        • Sensor Driver Development Guide
Powered by GitBook

Assoc. Prof. Wiroon Sriborrirux, Founder of Advance Innovation Center (AIC) and Bangsaen Design House (BDH), Electrical Engineering Department, Faculty of Engineering, Burapha University

On this page
  • 1. MQTT background application
  • 2. MQTT message structure
  • 2.1. Fixed header
  • Variable header
  • Payload
  • 3. MQTT message format
  • 4. Implementation of uMQTT
  • 4.1. uMQTT framework
  • uMQTT Client
  • uMQTT and LWIP
  • 4.4. uMQTT sends group packets
  • 4.5. uMQTT receiving and unpacking
  • 4.6. uMQTT data flow

Was this helpful?

  1. Recommended by AIC
  2. RT-Thread University Program
  3. Components
  4. Software Packages
  5. Internet of Things

MQTT-umqtt

PreviousInternet of ThingsNextTelnet

Last updated 7 months ago

Was this helpful?

MQTT is a machine-to-machine ( M2M)/Internet of Things ( IoT) connection protocol. Its full English name is " Message Queuing Telemetry Transport ", and its Chinese translation is " Message Queuing Telemetry Transport " protocol. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks. It is a "lightweight" communication protocol based on the 发布/ 订阅( publish/ subscribe) mode. The protocol is built on TCP/IPthe protocol and was released by IBM in 1999. At present, the latest version of the protocol is V5.0, and the commonly used version is V3.1.1.

Let's take an example to illustrate the actual application scenario of MQTT. As shown in Figure 1-1, it well shows a communication network system case based on the MQTT protocol:

Figure 1-1 Communication example based on MQTT

Definition of noun:

  • Publisher​

  • Broker​

  • Subscriber​

  • Topic - the topic for publish/subscribe

Process Overview: In the figure above, the role of each type of sensor is publisher. For example, the humidity sensor and temperature sensor publish two topics named "Moisture" and "Temp" to the connected MQTT Broker (periodically); of course, along with the two topics, there are also humidity values ​​and temperature values, which are called "messages". The role of several clients is subscriber. For example, if the mobile phone APP subscribes to the "Temp" topic from the Broker, the temperature value published by the temperature sensor in the Broker can be obtained on the mobile phone.

Additional notes:

  1. The roles of publishers and subscribers are not fixed but relative. A publisher can also subscribe to topics from a broker at the same time, and a subscriber can publish topics to a broker. That is, a publisher can be a subscriber and a subscriber can be a publisher.

  2. Broker can be an online cloud server or a local LAN client. According to the requirements, Broker itself will also include some functions of subscribing/publishing topics.

For more reference materials, please visit or .


Learning objectives:

  1. Understand the MQTT protocol and its message format

  2. Understanding the relationship between uMQTT and LWIP

  3. Understand the implementation principle of uMQTT

Any general/private protocol is composed of various message data packets that are pre-defined and constrained by certain rules, and MQTT is no exception. In the MQTT protocol, all data packets are composed of a maximum of three parts: 固定header+ 可变header+ 有效载荷, as shown in Table 2-1:

[Fixed header](#2.1. Fixed header)(at least 2 Bytes)

[Variable header](#2.2. Variable header)

[Payload](#2.3. Payload*)

Present in all MQTT control packets

Exists in some MQTT control packets

Exists in some MQTT control packets

Bytes[0] Bytes[1]...

... ...

...Bytes[N-1] Bytes[N]

Table 2-1 MQTT data packet format

Among them, the fixed header is required, and the variable header and payload are optional. Therefore, in theory, the minimum length of the MQTT protocol data packet is 2 bytes, which minimizes the extra resource consumption it occupies. Next, we will take MQTT3.1.1 as an example to introduce the detailed format of each part of the message (for detailed usage, see [**4. Implementation of uMQTT**](#4. Implementation of uMQTT)).

Bit

7

6

5

4

3

2

1

0

Bytes[0]

MQTT Control Packet type

MQTT Control Packet Type Flags (Flags specific to each MQTT Control Packet type)

Bytes[1]...

Remaining Length

Table 2 - 2 Fixed header format

The fixed header consists of at least two bytes, as shown in Table 2-2. The high 4 bits of the first byte describe the type of the current data packet ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)), see Table 2-3 below; the low 4 bits define the flag bits related to the packet type ([Flags specific to each MQTT Control Packet type](#2.1.2 Flags specific to each MQTT Control Packet type)), see Table 2-4 below; the second and subsequent up to 4 bytes represent the byte length of the remaining data ([Remaining Length](#2.1.3 Remaining Length)), see Table 2-5 below.

name

value

Bytes[0]

describe

Reserved

0

0x0*

Reserved seat

CONNECT

1

0x1*

Client requests connection

CONNACK

2

0x2*

Server connection confirmation

PUBLISH

3

0x3*

Publish a message

PUBACK

4

0x4*

Release Confirmation (QoS1)

PUBREC

5

0x5*

Publish Received (QoS2 - Guaranteed Delivery Part 1)

PUBREL

6

0x6*

Release Release (QoS2 - Guaranteed Delivery Part 2)

PUBCOMP

7

0x7*

Release Complete (QoS2 - Guaranteed Delivery Part 3)

SUBSCRIBE

8

0x8*

Client subscription request

SUBACK

9

0x9*

Server subscription confirmation

UNSUBSCRIBE

10

0xA*

Client unsubscribe request

UNSUBACK

11

0xB*

Server-side unsubscribe confirmation

PINGREQ

12

0xC*

Client heartbeat (PING) request

PINGRESP

13

0xD*

Server heartbeat (PING) response

DISCONNECT

14

0xE*

Client is about to disconnect

Reserved

15

0xF*

Reserved seat

Table 2-3 Control Packet type

Control message type

Fixed header flags

Bit 3

Bit 2

Bit 1

Bit 0

CONNECT

Reserved

0

0

0

0

CONNACK

Reserved

0

0

0

0

PUBLISH

Used in MQTT 3.1.1

DUP

QoS

QoS

RETAIN

PUBACK

Reserved

0

0

0

0

PUBREC

Reserved

0

0

0

0

PUBREL

Reserved

0

0

1

0

PUBCOMP

Reserved

0

0

0

0

SUBSCRIBE

Reserved

0

0

1

0

SUBACK

Reserved

0

0

0

0

UNSUBSCRIBE

Reserved

0

0

1

0

UNSUBACK

Reserved

0

0

0

0

PINGREQ

Reserved

0

0

0

0

PINGRESP

Reserved

0

0

0

0

DISCONNECT

Reserved

0

0

0

0

Table 2-4 Control Packet Flags

Bits[3..0 ]** of Bytes[0] ** in the Fixed header contain the special identifiers for each MQTT control message type, as shown in Tables 2 - 4 above. Although the flags in the table are "Reserved", they are reserved for future versions, but they must be assigned the values ​​in the table during data transmission. If an illegal flag that does not match the table is identified, the receiver must close the network connection.

Among them, three flags have been used in version 3.1.1:

  • DUP = Duplicate flag of control message [Duplicate]

  • QoS = Quality of Service for PUBLISH messages

  • RETAIN = PUBLISH message retention flag

Number of bytes

from (Bytes[1], Bytes[2], Bytes[3], Bytes[4])

Stop(Bytes[1], Bytes[2], Bytes[3], Bytes[4])

1

0 (0x00)

127 (0x7F)

2

128 (0x80, 0x01)

16 383 (0xFF, 0x7F)

3

16 384 (0x80, 0x80, 0x01)

2 097 151 (0xFF, 0xFF, 0x7F)

4

2 097 152 (0x80, 0x80, 0x80, 0x01)

268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

Table 2-5 Remaining Length

The remaining length indicates the number of bytes in the remaining part of the current message, including the variable header and payload data. The remaining length does not include the number of bytes used to encode the remaining length field itself, that is, it does not include the length of the Fixed header.

The remaining length field uses a variable length encoding scheme, as shown in Table 2 - 5 above:

  • Values ​​less than 128 are encoded using a single byte.

  • Larger values ​​are handled as follows - the least significant 7 bits are used to encode the data, and the most significant bit is used to indicate whether there are more bytes. Therefore, each byte can encode 128 values ​​and a continuation bit . The continuation bit only indicates the carry function and is not included in the calculation of the length.

  • This field can be up to 4 bytes.

Some MQTT control packets contain a variable header part, which is located between the fixed header and the payload. The content of the variable header varies depending on the control packet type ([MQTT Control Packet type](#2.1.1 MQTT Control Packet type)); for example, for a CONNECT type packet, its variable header contains four fields: "Protocol Name", "Protocol Level", "Connect Flags" and "Keep Alive". For other variable headers, see [MQTT Message Format](#3. MQTT Message Format).

Although there are various variable header contents, the Packet Identifier is an important universal field and has the following 唯一性:

byte

value

byte 1

Message Identifier MSB

byte 2

Message Identifier LSB

Table 2-6 Packet Identifier

This general field exists in many types of messages, see Table 2-7:

Message Type

Is there a message identifier?

CONNECT

NO

CONNACK

NO

PUBLISH

YES (If QoS > 0)

PUBACK

YES

PUBREC

YES

PUBREL

YES

PUBCOMP

YES

SUBSCRIBE

YES

SUBACK

YES

UNSUBSCRIBE

YES

UNSUBACK

YES

PINGREQ

NO

PINGRESP

NO

DISCONNECT

NO

Table 2 - 7 Control Packets that contain a Packet Identifier

The rules for Packet Identifier are as follows:

  • SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in the case of QoS > 0) control packets MUST contain a non-zero 16-bit Message Identifier.

  • Each time a client sends a new packet of one of these types, it MUST assign it a Message Identifier that is not currently in use.

  • If a client resends a particular control packet, it MUST use the same Message Identifier in subsequent retransmissions of that packet. The Message Identifier can be reused after the client processes the ACK packet corresponding to that particular control packet.

    • The PUBLISH packet of QoS 1 corresponds to PUBACK.

    • The PUBLISH packet of QoS 2 corresponds to PUBCOMP.

    • SUBSCRIBE and UNSUBSCRIBE are corresponding to each other by SUBACK and UNSUBACK.

  • When the server sends a PUBLISH message with QoS>0, the above rules should also be followed.

  • A PUBLISH message with QoS=0 is not allowed to contain a message identifier.

  • The message identifier of the PUBACK, PUBREC, and PUBREL packets must be the same as the value originally sent in the PUBLISH packet.

  • Similarly, the SUBACK and UNSUBACK packets must have the same message identifier as the SUBSCRIBE and UNSUBSCRIBE packets.

The client and server assign message identifiers independently of each other. Therefore, a client-server combination using the same message identifier can achieve concurrent information exchange.

Some MQTT control messages contain a payload at the end of the packet. For details, see [MQTT message format](#3. MQTT message format). For example, for a PUBLISH message, the payload can be an application message. Table 2 - 8 lists the various control messages that require a payload.

Control message

Payload

CONNECT

Required

CONNACK

None

PUBLISH

Optional

PUBACK

None

PUBREC

None

PUBREL

None

PUBCOMP

None

SUBSCRIBE

Required

SUBACK

Required

UNSUBSCRIBE

Required

UNSUBACK

None

PINGREQ

None

PINGRESP

None

DISCONNECT

None

Table 2-8 Control Packets that contain a Payload

Since there are many formats for Variable header and Payload, this article will not go into details. For details, please refer to the following links:

The uMQTT package features are as follows:

  • Implement basic connection, subscription, and publishing functions;

  • It has multiple heartbeat keep-alive and device reconnection mechanisms to ensure the online status of MQTT and adapt to complex situations;

  • Supports three types of information quality: QoS=0, QoS=1, QoS=2;

  • Support multi-client use;

  • The user-side interface is simple and has multiple external callback functions.

  • Supports multiple configurable technical parameters, easy to use and convenient for product development;

  • It has powerful functions, low resource usage, and supports tailorable functions.

The uMQTT software package is mainly used to implement the MQTT protocol on embedded devices. The main work of the software package is based on the MQTT protocol. The software package structure diagram is shown in Figure 4-1:

Figure 4-1 uMQTT structure diagram

The main tasks in the software package implementation process are:

  1. According to the MQTT 3.1.1 protocol, perform package data protocol encapsulation and unpacking;

  2. The transport layer function is adapted to connect to the SAL (Socket Abstraction Layer) layer;

  3. The uMQTT client layer writes an interface that conforms to the application layer based on the protocol packet layer and the transport layer. It implements basic connection, disconnection, subscription, unsubscription, message publishing and other functions. It supports QoS0/1/2 for sending information. It uses the uplink timer to implement multiple heartbeat keep-alive mechanisms and device reconnection mechanisms, increase the online stability of the device, and adapt to complex situations.

As shown in Figure 1-1, to connect to the Broker, the embedded device needs to be used as a client in the MQTT protocol.

In the uMQTT component umqtt.hfile, the MQTT configuration information used to initialize the client is abstracted to form the corresponding data structure:

struct umqtt_info
{
    rt_size_t send_size, recv_size;                     /* 收发缓冲区大小 */ 
    const char *uri;                                    /* 完整的URI (包含: URI + URN) */ 
    const char *client_id;                              /* 客户端ID */ 
    const char *lwt_topic;                              /* 遗嘱主题 */
    const char *lwt_message;                            /* 遗嘱消息 */ 
    const char *user_name;                              /* 用户名 */ 
    const char *password;                               /* 密码 */ 
    enum umqtt_qos lwt_qos;                             /* 遗嘱QoS */
    umqtt_subscribe_cb lwt_cb;                          /* 遗嘱回调函数 */
    rt_uint8_t reconnect_max_num;                       /* 最大重连次数 */ 
    rt_uint32_t reconnect_interval;                     /* 最大重连时间间隔 */ 
    rt_uint8_t keepalive_max_num;                       /* 最大保活次数 */ 
    rt_uint32_t keepalive_interval;                     /* 最大保活时间间隔 */ 
    rt_uint32_t recv_time_ms;                           /* 接收超时时间 */ 
    rt_uint32_t connect_time;                           /* 连接超时时间 */ 
    rt_uint32_t send_timeout;                           /* 上行(发布/订阅/取消订阅)超时时间 */ 
    rt_uint32_t thread_stack_size;                      /* 线程栈大小 */ 
    rt_uint8_t thread_priority;                         /* 线程优先级 */ 
#ifdef PKG_UMQTT_TEST_SHORT_KEEPALIVE_TIME
    rt_uint16_t connect_keepalive_sec;                  /* 连接信息,保活秒数 */    
#endif
};copymistakeCopy Success

These configuration information generally need to be filled in and specified before creating a uMQTT client, such as key information such as Broker's "URI", "user name" or "password". Other non-critical information, if not specified, will be umqtt_createcalled in the client creation function umqtt_check_def_infoto assign default values:

static void umqtt_check_def_info(struct umqtt_info *info)
{
    if (info) 
    {
        if (info->send_size == 0) { info->send_size = PKG_UMQTT_INFO_DEF_SENDSIZE; }
        if (info->recv_size == 0) { info->recv_size = PKG_UMQTT_INFO_DEF_RECVSIZE; }
        if (info->reconnect_max_num == 0) { info->reconnect_max_num = PKG_UMQTT_INFO_DEF_RECONNECT_MAX_NUM; }
        if (info->reconnect_interval == 0) { info->reconnect_interval = PKG_UMQTT_INFO_DEF_RECONNECT_INTERVAL; }
        if (info->keepalive_max_num == 0) { info->keepalive_max_num = PKG_UMQTT_INFO_DEF_KEEPALIVE_MAX_NUM; }
        if (info->keepalive_interval == 0) { info->keepalive_interval = PKG_UMQTT_INFO_DEF_HEARTBEAT_INTERVAL; }
        if (info->connect_time == 0) { info->connect_time = PKG_UMQTT_INFO_DEF_CONNECT_TIMEOUT; }
        if (info->recv_time_ms == 0) { info->recv_time_ms = PKG_UMQTT_INFO_DEF_RECV_TIMEOUT_MS; }
        if (info->send_timeout == 0) { info->send_timeout = PKG_UMQTT_INFO_DEF_SEND_TIMEOUT; }
        if (info->thread_stack_size == 0) { info->thread_stack_size = PKG_UMQTT_INFO_DEF_THREAD_STACK_SIZE; }
        if (info->thread_priority == 0) { info->thread_priority = PKG_UMQTT_INFO_DEF_THREAD_PRIORITY; }
    }
}copymistakeCopy Success

However, with only the above information, it is not possible to run an MQTT client. Therefore umqtt.c, in the structure containing umqtt_infoall umqtt_clientthe data used to initialize the client:

struct umqtt_client
{
    int sock;                                                   /* 套接字 */ 
    enum umqtt_client_state connect_state;                      /* mqtt客户端状态 */ 

    struct umqtt_info mqtt_info;                                /* mqtt用户配置信息 */ 
    rt_uint8_t reconnect_count;                                 /* mqtt客户端重连计数 */ 
    rt_uint8_t keepalive_count;                                 /* mqtt保活计数 */ 
    rt_uint32_t pingreq_last_tick;                              /* mqtt的PING请求上一次滴答值 */
    rt_uint32_t uplink_next_tick;                               /* 上行连接的下一次滴答值 */ 
    rt_uint32_t uplink_last_tick;                               /* 上行连接的上一次滴答值 */ 
    rt_uint32_t reconnect_next_tick;                            /* 客户端断开重连时的下一次滴答值 */ 
    rt_uint32_t reconnect_last_tick;                            /* 客户端断开重连时的上一次滴答值 */ 

    rt_uint8_t *send_buf, *recv_buf;                            /* 收发缓冲区指针 */ 
    rt_size_t send_len, recv_len;                               /* 收发数据的长度 */ 

    rt_uint16_t packet_id;                                      /* mqtt报文标识符 */ 

    rt_mutex_t lock_client;                                     /* mqtt客户端互斥锁 */ 
    rt_mq_t msg_queue;                                          /* mqtt客户端消息队列 */ 

    rt_timer_t uplink_timer;                                    /* mqtt保活重连定时器 */ 

    int sub_recv_list_len;                                      /* 接收订阅信息的链表长度 */ 
    rt_list_t sub_recv_list;                                    /* 订阅消息的链表头 */ 

    rt_list_t qos2_msg_list;                                    /* QoS2的消息链表 */
    struct umqtt_pubrec_msg pubrec_msg[PKG_UMQTT_QOS2_QUE_MAX]; /* 发布收到消息数组(QoS=2) */                   

    umqtt_user_callback user_handler;                           /* 用户句柄 */ 

    void *user_data;                                            /* 用户数据 */ 
    rt_thread_t task_handle;                                    /* umqtt任务线程 */ 

    rt_list_t list;                                             /* umqtt链表头 */ 
};copymistakeCopy Success

The structure and enumeration type definitions of some of the above members can umqtt.hbe viewed in the file. The structure will be initialized after the client function is created umqtt_createand called umqtt_check_def_info:

  1. Initialize the will data structure (if any)

  2. Apply memory for the send and receive buffers

  3. Create a mutex, message queue, and timeout reconnection timer (timeout callback to achieve reconnection + keep alive)

  4. Initialize each linked list

  5. Create umqtt_thread- mqtt data sending and receiving thread

  6. Returns mqtt_clientthe structure address

When the value returned in step 6 is not empty, you can call the function to send a CONNECT message to connect to the Broker umqtt_startthrough LWIP ; after the connection is successful , the thread will be started and the MQTT communication will be started.umqtt_thread

In umqtt_startthe function, the state of the uMQTT client is first set to UMQTT_CS_LINKING, which means 正在连接中. Next, the function is called umqtt_connectto connect the local client to the Broker.

Connecting to a Broker is a two-step process:

  1. Create a socket and establish a link connection with the Broker

  2. Send a CONNECT message to create an MQTT protocol connection

In umqtt_connectthe function, umqtt_trans_connectthe first step is completed by calling the function:

/** 
 * TCP/TLS Connection Complete for configured transport
 *
 * @param uri the input server URI address
 * @param sock the output socket
 *
 * @return <0: failed or other error
 *         =0: success
 */
int umqtt_trans_connect(const char *uri, int *sock)
{
    int _ret = 0;
    struct addrinfo *addr_res = RT_NULL;

    *sock = -1;
    /* 域名解析 */
    _ret = umqtt_resolve_uri(uri, &addr_res);
    if ((_ret < 0) || (addr_res == RT_NULL)) 
    {
        LOG_E("resolve uri err");
        _ret = UMQTT_FAILED;
        goto exit;
    }
    /* 创建套接字 */
    if ((*sock = socket(addr_res->ai_family, SOCK_STREAM, UMQTT_SOCKET_PROTOCOL)) < 0) 
    {
        LOG_E("create socket error!");
        _ret = UMQTT_FAILED;
        goto exit;
    }
    /* 设置套接字工作在非阻塞模式下 */
    _ret = ioctlsocket(*sock, FIONBIO, 0);
    if (_ret < 0) 
    {
        LOG_E(" iocontrol socket error!");
        _ret = UMQTT_FAILED;
        goto exit;
    }
    /* 建立连接 */
    if ((_ret = connect(*sock, addr_res->ai_addr, addr_res->ai_addrlen)) < 0) 
    {
        LOG_E(" connect err!");
        closesocket(*sock);
        *sock = -1;
        _ret = UMQTT_FAILED;
        goto exit;
    }

exit:
    if (addr_res) {
        freeaddrinfo(addr_res);
        addr_res = RT_NULL;
    }
    return _ret;
}copymistakeCopy Success

This function is the core function for uMQTT to establish a connection with Broker through LWIP. And from the [uMQTT framework diagram](#**4.1. uMQTT structure framework**), we can know that this function uses SAL , the socket abstraction layer component, to call the relevant interface to access LWIP. Some of the functions encapsulated by the SAL component ( used to resolve the domain name getaddrinfoin umqtt_resolve_urithe function) are as follows:

int getaddrinfo(const char *nodename,
       const char *servname,
       const struct addrinfo *hints,
       struct addrinfo **res)
{
    return sal_getaddrinfo(nodename, servname, hints, res);
}
---------------------------------------------------------------------------------------------
#define connect(s, name, namelen)                          sal_connect(s, name, namelen)
#define recvfrom(s, mem, len, flags, from, fromlen)        sal_recvfrom(s, mem, len, flags, from, fromlen)
#define send(s, dataptr, size, flags)                      sal_sendto(s, dataptr, size, flags, NULL, NULL)
#define socket(domain, type, protocol)                     sal_socket(domain, type, protocol)
#define closesocket(s)                                     sal_closesocket(s)
#define ioctlsocket(s, cmd, arg)                           sal_ioctlsocket(s, cmd, arg)copymistakeCopy Success

When the uMQTT client successfully establishes a link layer connection with the Broker, it will immediately send a CONNECT message to establish an MQTT protocol layer connection.

The uMQTT component uses a clever structure + union to manage all sent and received messages:

union umqtt_pkgs_msg                                /* mqtt message packet type */ 
{
    struct umqtt_pkgs_connect     connect;          /* connect */ 
    struct umqtt_pkgs_connack     connack;          /* connack */ 
    struct umqtt_pkgs_publish     publish;          /* publish */ 
    struct umqtt_pkgs_puback      puback;           /* puback */ 
    struct umqtt_pkgs_pubrec      pubrec;           /* publish receive (QoS 2, step_1st) */ 
    struct umqtt_pkgs_pubrel      pubrel;           /* publish release (QoS 2, step_2nd) */ 
    struct umqtt_pkgs_pubcomp     pubcomp;          /* publish complete (QoS 2, step_3rd) */ 
    struct umqtt_pkgs_subscribe   subscribe;        /* subscribe topic */ 
    struct umqtt_pkgs_suback      suback;           /* subscribe ack */ 
    struct umqtt_pkgs_unsubscribe unsubscribe;      /* unsubscribe topic */ 
    struct umqtt_pkgs_unsuback    unsuback;         /* unsubscribe ack */ 
};

struct umqtt_msg
{
    union umqtt_pkgs_fix_header header;             /* fix header */ 
    rt_uint32_t msg_len;                            /* message length */ 
    union umqtt_pkgs_msg msg;                       /* retain payload message */ 
};copymistakeCopy Success
/** 
 * packaging the data according to the format
 *
 * @param type the input packaging type
 * @param send_buf the output send buf, result of the package
 * @param send_len the output send buffer length
 * @param message the input message
 *
 * @return <=0: failed or other error
 *         >0: package data length
 */
int umqtt_encode(enum umqtt_type type, rt_uint8_t *send_buf, size_t send_len, struct umqtt_msg *message)
{
    int _ret = 0;
    switch (type)
    {
    case UMQTT_TYPE_CONNECT:
        _ret = umqtt_connect_encode(send_buf, send_len, &(message->msg.connect));
        break;
    case UMQTT_TYPE_PUBLISH:
        _ret = umqtt_publish_encode(send_buf, send_len, message->header.bits.dup, message->header.bits.qos, &(message->msg.publish));
        break;
    case UMQTT_TYPE_PUBACK:
        _ret = umqtt_puback_encode(send_buf, send_len, message->msg.puback.packet_id);
        break;
    case UMQTT_TYPE_PUBREC:
        // _ret = umqtt_pubrec_encode();
        break;
    case UMQTT_TYPE_PUBREL:
        _ret = umqtt_pubrel_encode(send_buf, send_len, message->header.bits.dup, message->msg.pubrel.packet_id);
        break;
    case UMQTT_TYPE_PUBCOMP:
        _ret = umqtt_pubcomp_encode(send_buf, send_len, message->msg.pubcomp.packet_id);
        break;
    case UMQTT_TYPE_SUBSCRIBE:
        _ret = umqtt_subscribe_encode(send_buf, send_len, &(message->msg.subscribe));
        break;
    case UMQTT_TYPE_UNSUBSCRIBE:
        _ret = umqtt_unsubscribe_encode(send_buf, send_len, &(message->msg.unsubscribe));
        break;
    case UMQTT_TYPE_PINGREQ:
        _ret = umqtt_pingreq_encode(send_buf, send_len);
        break;
    case UMQTT_TYPE_DISCONNECT:
        _ret = umqtt_disconnect_encode(send_buf, send_len);
        break;
    default:
        break;
    }
    return _ret;
}
copymistakeCopy Success

The macro definition of the MQTT control packet type corresponds to [2.1.1 MQTT Control Packet type](#2.1.1 MQTT Control Packet type):

enum umqtt_type
{
    UMQTT_TYPE_RESERVED      = 0,
    UMQTT_TYPE_CONNECT       = 1,
    UMQTT_TYPE_CONNACK       = 2,
    UMQTT_TYPE_PUBLISH       = 3,
    UMQTT_TYPE_PUBACK        = 4,
    UMQTT_TYPE_PUBREC        = 5,
    UMQTT_TYPE_PUBREL        = 6,
    UMQTT_TYPE_PUBCOMP       = 7,
    UMQTT_TYPE_SUBSCRIBE     = 8,
    UMQTT_TYPE_SUBACK        = 9,
    UMQTT_TYPE_UNSUBSCRIBE   = 10,
    UMQTT_TYPE_UNSUBACK      = 11,
    UMQTT_TYPE_PINGREQ       = 12,
    UMQTT_TYPE_PINGRESP      = 13,
    UMQTT_TYPE_DISCONNECT    = 14,
};copymistakeCopy Success
  1. Fill in the default configuration information of the MQTT client

        encode_msg.msg.connect.protocol_name_len = PKG_UMQTT_PROTOCOL_NAME_LEN;
        encode_msg.msg.connect.protocol_name = PKG_UMQTT_PROTOCOL_NAME;
        encode_msg.msg.connect.protocol_level = PKG_UMQTT_PROTOCOL_LEVEL; /* MQTT3.1.1 ver_lvl:4;  MQTT3.1 ver_lvl:3 */ 
        encode_msg.msg.connect.connect_flags.connect_sign = UMQTT_DEF_CONNECT_FLAGS;
    #ifdef PKG_UMQTT_TEST_SHORT_KEEPALIVE_TIME
        encode_msg.msg.connect.keepalive_interval_sec = ((client->mqtt_info.connect_keepalive_sec == 0) ? PKG_UMQTT_CONNECT_KEEPALIVE_DEF_TIME : client->mqtt_info.connect_keepalive_sec);
    #else
        encode_msg.msg.connect.keepalive_interval_sec = PKG_UMQTT_CONNECT_KEEPALIVE_DEF_TIME;
    #endif
        encode_msg.msg.connect.client_id = client->mqtt_info.client_id;
        encode_msg.msg.connect.will_topic = client->mqtt_info.lwt_topic;
        encode_msg.msg.connect.will_message = client->mqtt_info.lwt_message;
        encode_msg.msg.connect.user_name = client->mqtt_info.user_name;
        if (client->mqtt_info.user_name) 
        {
            encode_msg.msg.connect.connect_flags.bits.username_flag = 1;
        }
        encode_msg.msg.connect.password = client->mqtt_info.password;
        if (client->mqtt_info.password) {
            encode_msg.msg.connect.connect_flags.bits.password_flag = 1;
            encode_msg.msg.connect.password_len = rt_strlen(client->mqtt_info.password);
        }copymistakeCopy Success
  2. Call umqtt_encode→ umqtt_connect_encodeencoding function (only encapsulated MQTTSerialize_connect) package:

    static int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)
    {
        unsigned char *ptr = buf;
        MQTTHeader header = { 0 };
        int len = 0;
        int rc = -1;
    
        if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen) 
        {
            rc = UMQTT_BUFFER_TOO_SHORT;
            goto exit;
        }
    
        header.byte = 0;
        header.bits.type = UMQTT_TYPE_CONNECT;
        umqtt_writeChar(&ptr, header.byte);                                 /* 写固定header的第一个字节 */
    
        ptr += umqtt_pkgs_encode(ptr, len);                                 /* 写剩余长度 */
    
        if (options->protocol_level == 4)                                     /* MQTT V3.1.1 */
        {
            umqtt_writeCString(&ptr, "MQTT");
            umqtt_writeChar(&ptr, (char) 4);
        } 
        else 
        {
            umqtt_writeCString(&ptr, "MQIsdp");                                /* MQTT V3.1 */
            umqtt_writeChar(&ptr, (char) 3);
        }
    
        umqtt_writeChar(&ptr, options->connect_flags.connect_sign);
        umqtt_writeInt(&ptr, options->keepalive_interval_sec);
        // umqtt_writeInt(&ptr, PKG_UMQTT_CONNECT_KEEPALIVE_DEF_TIME);                                       /* ping interval max, 0xffff */ 
        umqtt_writeMQTTString(&ptr, options->client_id);
        if (options->connect_flags.bits.will_flag) 
        {
            umqtt_writeMQTTString(&ptr, options->will_topic);
            umqtt_writeMQTTString(&ptr, options->will_message);
        }
    
        if (options->connect_flags.bits.username_flag)
            umqtt_writeMQTTString(&ptr, options->user_name);
        if (options->connect_flags.bits.password_flag)
            umqtt_writeMQTTString(&ptr, options->password);
    
        rc = ptr - buf;
    
    exit:
        return rc;
    }copymistakeCopy Success

    The function is first called MQTTSerialize_connectLengthto calculate the length of the variable header and payload . The obtained len will be passed to umqtt_pkgs_lenthe function as a parameter. Its function is to calculate the number of bytes of the field in the fixed header and add the length of the first byte of the fixed header , which is 1, and compare it with buflen to determine the validity of the packet data.剩余长度

    Why is this kind of compound statement used here if (umqtt_pkgs_len(len = MQTTSerialize_connectLength(options)) > buflen) ?

    Because the len length we want to get is the value in the fixed header剩余长度 to facilitate the subsequent packet assembly process, and the effective message length buflen = len + 1 + 剩余长度the number of bytes in the field; if the message length is calculated directly, when the value is written later 剩余长度, it is also necessary to subtract its own byte length and the first byte length of the fixed header, which is 1, which is more complicated and cumbersome.

    Here are some important structures and unions to correspond to the MQTT protocol:

    • Fixed header

      Please refer to [**2.1. Fixed header**](#**2.1. Fixed header**)

      union umqtt_pkgs_fix_header
      {
          rt_uint8_t byte;                                /* header */ 
          struct {        
              rt_uint8_t retain: 1;                       /* reserved bits */ 
              rt_uint8_t qos:    2;                       /* QoS, 0-Almost once; 1-Alteast once; 2-Exactly once */ 
              rt_uint8_t dup:    1;                       /* dup flag */ 
              rt_uint8_t type:   4;                       /* MQTT packet type */ 
          } bits;
      };copymistakeCopy Success
    • CONNECT flag

      Bit

      7

      6

      5

      4

      3

      2

      1

      0

      describe

      User Name Flag

      Password Flag

      Will Retain

      Will QoS

      Will Flag

      Clean Session

      Reserved

      byte 8

      X

      X

      X

      X

      X

      X

      X

      0

      union umqtt_pkgs_connect_sign
      {
          rt_uint8_t connect_sign;
          struct {
              rt_uint8_t reserved:       1;               /* reserved bits */ 
              rt_uint8_t clean_session:  1;               /* clean session bit */ 
              rt_uint8_t will_flag:      1;               /* will flag bit */ 
              rt_uint8_t will_Qos:       2;               /* will Qos bit */ 
              rt_uint8_t will_retain:    1;               /* will retain bit */ 
              rt_uint8_t password_flag:  1;               /* password flag bit */ 
              rt_uint8_t username_flag:  1;               /* user name flag bit */ 
          } bits;
      };
      -------------------------------------------------------------------------------
      #define UMQTT_SET_CONNECT_FLAGS(user_name_flag, password_flag, will_retain, will_qos, will_flag, clean_session, reserved)    \
          (((user_name_flag & 0x01) << 7) |    \
          ((password_flag & 0x01) << 6) |  \
          ((will_retain & 0x01) << 5) |   \
          ((will_qos & 0x01) << 3) |   \
          ((will_flag & 0x01) << 2) | \
          ((clean_session & 0x01) << 1) | \
          (reserved & 0x01))
      #define UMQTT_DEF_CONNECT_FLAGS                             (UMQTT_SET_CONNECT_FLAGS(0,0,0,0,0,1,0))copymistakeCopy Success

After the above package assembly process is completed, umqtt_trans_sendthe function will be called to send the send buffer data to the Broker connected to the socket through LWIP:

/** 
 * TCP/TLS send datas on configured transport.
 *
 * @param sock the input socket
 * @param send_buf the input, transport datas buffer
 * @param buf_len the input, transport datas buffer length
 * @param timeout the input, tcp/tls transport timeout
 *
 * @return <0: failed or other error
 *         =0: success
 */
int umqtt_trans_send(int sock, const rt_uint8_t *send_buf, rt_uint32_t buf_len, int timeout)
{
    int _ret = 0;
    rt_uint32_t offset = 0U;
    while (offset < buf_len) 
    {
        _ret = send(sock, send_buf + offset, buf_len - offset, 0);
        if (_ret < 0) 
            return -errno;
        offset += _ret;
    }

    return _ret;
}copymistakeCopy Success

When uMQTT completes sending the CONNECT message, it calls umqtt_handle_readpacketthe function (after completing the CONNECT process, the function will also umqtt_threadbe called cyclically in the thread to send and receive data) to read the Broker's reply and unpack the received data:

static int umqtt_handle_readpacket(struct umqtt_client *client)
{
    int _ret = 0, _onedata = 0, _cnt = 0, _loop_cnt = 0, _remain_len = 0;
    int _temp_ret = 0;
    int _pkt_len = 0;
    int _multiplier = 1;
    int _pkt_type = 0;
    struct umqtt_msg decode_msg = { 0 };
    struct umqtt_msg_ack msg_ack = { 0 };
    struct umqtt_msg encode_msg = { 0 };
    RT_ASSERT(client);

    /* 1. 读Fixed header的第一个字节 */
    _temp_ret = umqtt_trans_recv(client->sock, client->recv_buf, 1);
    if (_temp_ret <= 0) 
    {
        _ret = UMQTT_FIN_ACK;
        LOG_W(" server fin ack! connect failed! need to reconnect!");
        goto exit;
    } 

    /* 2. 读Fixed header的Remaining length字段并解析剩余长度 */
    do {
        if (++_cnt > MAX_NO_OF_REMAINING_LENGTH_BYTES) 
        {
            _ret = UMQTT_FAILED;
            LOG_E(" umqtt packet length error!");
            goto exit;
        }
        _ret = umqtt_readpacket(client, (unsigned char *)&_onedata, 1, client->mqtt_info.recv_time_ms);
        if (_ret == UMQTT_FIN_ACK)
        {
            LOG_W(" server fin ack! connect failed! need to reconnect!");
            goto exit;
        }
        else if (_ret != UMQTT_OK) 
        {
            _ret = UMQTT_READ_FAILED;
            goto exit;
        }
        *(client->recv_buf + _cnt) = _onedata;
        _pkt_len += (_onedata & 0x7F) * _multiplier;
        _multiplier *= 0x80; 
    } while ((_onedata & 0x80) != 0);

    /* 异常处理:如果当前报文的数据长度大于缓冲区长度,会将socket缓冲中的数据全部读出来丢掉,返回UMQTT_BUFFER_TOO_SHORT错误 */
    if ((_pkt_len + 1 + _cnt) > client->mqtt_info.recv_size)
    {
        LOG_W(" socket read buffer too short! will read and delete socket buff! ");
        _loop_cnt = _pkt_len / client->mqtt_info.recv_size;

        do 
        {
            if (_loop_cnt == 0)
            {
                umqtt_readpacket(client, client->recv_buf, _pkt_len, client->mqtt_info.recv_time_ms);
                _ret = UMQTT_BUFFER_TOO_SHORT;
                LOG_W(" finish read and delete socket buff!");
                goto exit;
            }
            else 
            {
                _loop_cnt--;
                umqtt_readpacket(client, client->recv_buf, client->mqtt_info.recv_size, client->mqtt_info.recv_time_ms);
                _pkt_len -= client->mqtt_info.recv_size;
            }
        }while(1);
    }

    /* 3. 读剩余数据——可变header+有效载荷 */
    _ret = umqtt_readpacket(client, client->recv_buf + _cnt + 1, _pkt_len, client->mqtt_info.recv_time_ms);
    if (_ret == UMQTT_FIN_ACK)
    {
        LOG_W(" server fin ack! connect failed! need to reconnect!");
        goto exit;
    }
    else if (_ret != UMQTT_OK)
    {
        _ret = UMQTT_READ_FAILED;
        LOG_E(" read remain datas error!");
        goto exit;
    }

    /* 4. 解析数据包,并根据不同报文类型做相应处理 */
    rt_memset(&decode_msg, 0, sizeof(decode_msg));
    _ret = umqtt_decode(client->recv_buf, _pkt_len + _cnt + 1, &decode_msg);
    if (_ret < 0) 
    {
        _ret = UMQTT_DECODE_ERROR;
        LOG_E(" decode error!");
        goto exit;
    }
    _pkt_type = decode_msg.header.bits.type;
    switch (_pkt_type)
    {
    case UMQTT_TYPE_CONNACK:
        {
            LOG_D(" read connack cmd information!");
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
            set_connect_status(client, UMQTT_CS_LINKED);
        }
        break;
    case UMQTT_TYPE_PUBLISH:
        {
            LOG_D(" read publish cmd information!");
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);

            if (decode_msg.header.bits.qos != UMQTT_QOS2) 
            {
                LOG_D(" qos: %d, deliver message! topic nme: %s ", decode_msg.header.bits.qos, decode_msg.msg.publish.topic_name);
                umqtt_deliver_message(client, decode_msg.msg.publish.topic_name, decode_msg.msg.publish.topic_name_len, 
                                    &(decode_msg.msg.publish));
            }

            if (decode_msg.header.bits.qos != UMQTT_QOS0)
            {   
                rt_memset(&encode_msg, 0, sizeof(encode_msg));
                encode_msg.header.bits.qos = decode_msg.header.bits.qos;
                encode_msg.header.bits.dup = decode_msg.header.bits.dup;
                if (decode_msg.header.bits.qos == UMQTT_QOS1)
                {
                    encode_msg.header.bits.type = UMQTT_TYPE_PUBACK;
                    encode_msg.msg.puback.packet_id = decode_msg.msg.publish.packet_id;
                }
                else if (decode_msg.header.bits.qos == UMQTT_QOS2)
                {
                    encode_msg.header.bits.type = UMQTT_TYPE_PUBREC;
                    add_one_qos2_msg(client, &(decode_msg.msg.publish));
                    encode_msg.msg.pubrel.packet_id = decode_msg.msg.publish.packet_id;
                    add_one_pubrec_msg(client, encode_msg.msg.pubrel.packet_id);        /* add pubrec message */
                }

                _ret = umqtt_encode(encode_msg.header.bits.type, client->send_buf, client->mqtt_info.send_size, 
                                    &encode_msg);
                if (_ret < 0)
                {
                    _ret = UMQTT_ENCODE_ERROR;
                    LOG_E(" puback / pubrec failed!");
                    goto exit;
                }
                client->send_len = _ret;

                _ret = umqtt_trans_send(client->sock, client->send_buf, client->send_len, 
                                        client->mqtt_info.send_timeout);
                if (_ret < 0) 
                {
                    _ret = UMQTT_SEND_FAILED;
                    LOG_E(" trans send failed!");
                    goto exit;                    
                }

            }
        }
        break;
    case UMQTT_TYPE_PUBACK:
        {
            LOG_D(" read puback cmd information!");
            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_PUBACK;
            msg_ack.packet_id = decode_msg.msg.puback.packet_id;
            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                LOG_E(" mq send failed!");
                goto exit;
            }
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
        }
        break;
    case UMQTT_TYPE_PUBREC:
        {
            LOG_D(" read pubrec cmd information!");
            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_PUBREC;
            msg_ack.packet_id = decode_msg.msg.puback.packet_id;
            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK)
            {
                _ret = UMQTT_SEND_FAILED;
                LOG_E(" mq send failed!");
                goto exit;
            }
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
        }
        break;
    case UMQTT_TYPE_PUBREL:
        {
            LOG_D(" read pubrel cmd information!");   

            rt_memset(&encode_msg, 0, sizeof(encode_msg));
            encode_msg.header.bits.type = UMQTT_TYPE_PUBCOMP;
            encode_msg.header.bits.qos = decode_msg.header.bits.qos;
            encode_msg.header.bits.dup = decode_msg.header.bits.dup;            
            encode_msg.msg.pubrel.packet_id = decode_msg.msg.pubrec.packet_id;

            /* publish callback, and delete callback */
            qos2_publish_delete(client, encode_msg.msg.pubrel.packet_id);

            /* delete array numbers! */
            clear_one_pubrec_msg(client, encode_msg.msg.pubrel.packet_id);

            _ret = umqtt_encode(UMQTT_TYPE_PUBCOMP, client->send_buf, client->mqtt_info.send_size, &encode_msg);
            if (_ret < 0)
            {
                _ret = UMQTT_ENCODE_ERROR;
                LOG_E(" pubcomp failed!");
                goto exit;
            }
            client->send_len = _ret;

            _ret = umqtt_trans_send(client->sock, client->send_buf, client->send_len, 
                                    client->mqtt_info.send_timeout);
            if (_ret < 0) 
            {
                _ret = UMQTT_SEND_FAILED;
                LOG_E(" trans send failed!");
                goto exit;                    
            }

        }
        break;
    case UMQTT_TYPE_PUBCOMP:
        {
            LOG_D(" read pubcomp cmd information!");

            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_PUBCOMP;
            msg_ack.packet_id = decode_msg.msg.pubcomp.packet_id;
            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                goto exit;
            }            
        }
        break;
    case UMQTT_TYPE_SUBACK:
        {
            LOG_D(" read suback cmd information!");

            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_SUBACK;
            msg_ack.packet_id = decode_msg.msg.suback.packet_id;

            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);

            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                goto exit;
            }
        }
        break;
    case UMQTT_TYPE_UNSUBACK:
        {
            LOG_D(" read unsuback cmd information!");

            rt_memset(&msg_ack, 0, sizeof(msg_ack));
            msg_ack.msg_type = UMQTT_TYPE_UNSUBACK;
            msg_ack.packet_id = decode_msg.msg.unsuback.packet_id;

            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);

            _ret = rt_mq_send(client->msg_queue, &msg_ack, sizeof(struct umqtt_msg_ack));
            if (_ret != RT_EOK) 
            {
                _ret = UMQTT_SEND_FAILED;
                goto exit;
            }

        }
        break;
    case UMQTT_TYPE_PINGRESP:
        {
            LOG_I(" ping resp! broker -> client! now tick: %d ", rt_tick_get());
            set_uplink_recon_tick(client, UPLINK_NEXT_TICK);
        }
        break;
    default:
        {
            LOG_W(" not right type(0x%02x)!", _pkt_type);
        }
        break;
    }

exit:
    return _ret;
}copymistakeCopy Success

Briefly describe the above key steps:

  1. Read the first byte of the Fixed header

    Here the function is called umqtt_trans_recvto read the socket data:

    /** 
     * TCP/TLS receive datas on configured transport.
     *
     * @param sock the input socket
     * @param recv_buf the output, receive datas buffer
     * @param buf_len the input, receive datas buffer length
     *
     * @return <=0: failed or other error
     *         >0: receive datas length
     */
    int umqtt_trans_recv(int sock, rt_uint8_t *recv_buf, rt_uint32_t buf_len)
    {
        return recv(sock, recv_buf, buf_len, 0);
        // return read(sock, recv_buf, buf_len);
    }
    -------------------------------------------------------------------------
    #define recv(s, mem, len, flags)                           sal_recvfrom(s, mem, len, flags, NULL, NULL)copymistakeCopy Success

    sal_recvfromIt can be seen that this function is actually a wrapper for the SAL layer function, which is used to read data of length buf_len from the corresponding sock to recv_buf.

  2. Read the Remaining length field of the Fixed header and parse the remaining length

    I will not go into details here. The algorithm for this part refers to the rules in [2.1.3 Remaining Length](#2.1.3 Remaining Length).

  3. Read remaining data - variable header + payload

    No more details.

  4. Parse data packets and process them accordingly according to different message types

    Here is a key structure and unpacking function:

    • struct umqtt_msg decode_msg = { 0 };copymistakeCopy Success

      For structure members, please refer to **[4.4. uMQTT sends a package](#4.4. uMQTT sends a package)**

    • /** 
       * parse the data according to the format
       *
       * @param recv_buf the input, the raw buffer data, of the correct length determined by the remaining length field
       * @param recv_buf_len the input, the length in bytes of the data in the supplied buffer
       * @param message the output datas
       *
       * @return <0: failed or other error
       *         =0: success
       */
      int umqtt_decode(rt_uint8_t *recv_buf, size_t recv_buf_len, struct umqtt_msg *message)
      {
          int _ret = 0;
          rt_uint8_t* curdata = recv_buf;
          enum umqtt_type type;
          if (message == RT_NULL) 
          {
              _ret = UMQTT_INPARAMS_NULL;
              LOG_E(" umqtt decode inparams null!");
              goto exit;
          }
      
          message->header.byte = umqtt_readChar(&curdata);
          type = message->header.bits.type;
      
          switch (type)
          {
          case UMQTT_TYPE_CONNACK:
              _ret = umqtt_connack_decode(&(message->msg.connack), recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PUBLISH:
              _ret = umqtt_publish_decode(message, recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PUBACK:
              _ret = umqtt_puback_decode(message, recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PUBREC:
              // _ret = umqtt_pubrec_decode();
              break;
          case UMQTT_TYPE_PUBREL:
              // _ret = umqtt_pubrel_decode();
              break;
          case UMQTT_TYPE_PUBCOMP:
              // _ret = umqtt_pubcomp_decode();
              break;
          case UMQTT_TYPE_SUBACK:
              _ret = umqtt_suback_decode(&(message->msg.suback), recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_UNSUBACK:
              _ret = umqtt_unsuback_decode(&(message->msg.unsuback), recv_buf, recv_buf_len);
              break;
          case UMQTT_TYPE_PINGRESP:
              // _ret = umqtt_pingresp_encode();
              break;
          default:
              break;
          }
      exit:
          return _ret;
      }copymistakeCopy Success

      There are many types of messages, but we will only take the CONNECT message as an example:

      static int umqtt_connack_decode(struct umqtt_pkgs_connack *connack_msg, rt_uint8_t* buf, int buflen)
      {
          MQTTHeader header = {0};
          unsigned char* curdata = buf;
          unsigned char* enddata = NULL;
          int rc = 0;
          int mylen;
      
          header.byte = umqtt_readChar(&curdata);
          if (header.bits.type != UMQTT_TYPE_CONNACK)
          {
              rc = UMQTT_FAILED;
              LOG_E(" not connack type!");
              goto exit;
          }
      
          curdata += (rc = umqtt_pkgs_decodeBuf(curdata, &mylen)); /* read remaining length */
          enddata = curdata + mylen;
          if (enddata - curdata < 2)
          {
              LOG_D(" enddata:%d, curdata:%d, mylen:%d", enddata, curdata, mylen);
              goto exit;
          }
      
          connack_msg->connack_flags.connack_sign = umqtt_readChar(&curdata);
          connack_msg->ret_code = umqtt_readChar(&curdata);
      exit:
          return rc;
      }
      -----------------------------------------------------------------------------------------
      union umqtt_pkgs_connack_sign
      {
          rt_uint8_t connack_sign;
          struct {
              rt_uint8_t sp:             1;               /* current session bit */ 
              rt_uint8_t reserved:       7;               /* retain bit */ 
          } bits;
      };
      struct umqtt_pkgs_connack
      {
          /* variable header */ 
          union umqtt_pkgs_connack_sign connack_flags;    /* connect flags */ 
          enum umqtt_connack_retcode ret_code;            /* connect return code */ 
          /* payload = NULL */ 
      };copymistakeCopy Success

      It is still the familiar routine: read Fixed header → read Remaining length → read Variable header to parse related flags

  5. UMQTT_TYPE_CONNACK:

    Call set_uplink_recon_tick(client, UPLINK_NEXT_TICK)the function to set the next reconnection tick value, and call set_connect_status(client, UMQTT_CS_LINKED)the function to set the uMQTT client state 已连接.

At this point, the sending and receiving process of the CONNECT message has been completed. The next step is to start umqtt_threadthe thread and call umqtt_handle_readpacketthe function to process the data message received from the Broker server. The message processing process is similar to the above and will not be repeated. For the specific content and related processes, please refer to Figure 4-2:

Figure 4-2 MQTT communication flow chart

In summary, a flowchart is used to briefly describe some important function calls of uMQTT, as shown in Figure 4-3. Since many details are difficult to show, it is also necessary to understand its functional flow from the actual code.

Figure 4-3 umqtt important function flow chart

uMQTT software package is independently developed by RT-Thread, based on the client implementation of MQTT 3.1.1 protocol, which provides basic functions for devices to communicate with MQTT Broker. Gitee homepage .

umqtt_layered diagram

The various message types of this structure correspond exactly to the various control message types in [Chapter 2.1.1](#2.1.1 MQTT Control Packet type) (PINGREQ and PINGRESP messages only need two bytes each, , so there is no need to use structures to manage them). umqtt_encodeDifferent packet assembly functions are called through functions, the structures of the corresponding formats are filled, and then sent to the Broker server:

Since there are many types of messages, let's take the message (variable header - "protocol name", "protocol level", "connection flag", "keep alive interval (seconds)", payload - "client identifier", "will topic", "will message", "user name", "password") as an example to briefly describe the package assembly process of uMQTT:

MQTT flow chart

umqtt flow chart
🤟
2. MQTT message structure
2.1. Fixed header
2.1.1 MQTT Control Packet type
2.1.2 Flags specific to each MQTT Control Packet type
2.1.3 Remaining Length
Variable header
Payload
3. MQTT message format
MQTT Version 3.1.1 | 3 MQTT Control Packets
MQTT 3.1.1 Protocol Chinese Version | MQTT Chinese Website
Chapter 3 MQTT Control Packets
Chapter 3 – MQTT Control Messages
4. Implementation of uMQTT
Click here for GitHub homepage and
here for
4.1. uMQTT framework
uMQTT Client
uMQTT and LWIP
4.4. uMQTT sends group packets
refer to here
CONNECT
4.5. uMQTT receiving and unpacking
4.6. uMQTT data flow
1. MQTT background application
MQTT Chinese website
MQTT official website