Inter-thread communication

The previous chapter discussed synchronization between threads, and mentioned concepts such as semaphores, mutexes, and event sets; this chapter continues the content of the previous chapter and explains inter-thread communication. In bare metal programming, global variables are often used for communication between functions. For example, some functions may change the value of a global variable due to some operations, and another function reads this global variable and performs corresponding actions based on the read global variable value to achieve the purpose of communication and collaboration. RT-Thread provides more tools to help pass information between different threads, and this chapter will introduce these tools in detail. After studying this chapter, you will learn how to use mailboxes, message queues, and signals for communication between threads.

Mailbox service is a typical inter-thread communication method in real-time operating systems. For example, there are two threads. Thread 1 detects the key status and sends it, while Thread 2 reads the key status and changes the LED on and off accordingly. Here, the mailbox method can be used for communication. Thread 1 sends the key status as an email to the mailbox, and Thread 2 reads the email in the mailbox to obtain the key status and turns the LED on and off.

Thread 1 here can also be expanded into multiple threads. For example, there are three threads in total, thread 1 detects and sends the key status, thread 2 detects and sends ADC sampling information, and thread 3 performs different operations according to the type of information received.

The mailbox of the RT-Thread operating system is used for inter-thread communication, which is characterized by low overhead and high efficiency. Each email in the mailbox can only hold a fixed 4-byte content (for a 32-bit processing system, the size of a pointer is 4 bytes, so one email can hold just one pointer). A typical mailbox is also called an exchange message. As shown in the figure below, a thread or interrupt service routine sends a 4-byte email to the mailbox, and one or more threads can receive and process these emails from the mailbox.

The non-blocking mail sending process can be safely applied to interrupt services. It is an effective means for threads, interrupt services, and timers to send messages to threads. Generally speaking, the mail receiving process may be blocking, depending on whether there are mails in the mailbox and the timeout set when receiving mails. When there are no mails in the mailbox and the timeout is not 0, the mail receiving process will become blocking. In this case, only threads can receive mails.

When a thread sends mail to a mailbox, if the mailbox is not full, the mail will be copied to the mailbox. If the mailbox is full, the sending thread can set a timeout, choose to wait and suspend, or directly return - RT_EFULL. If the sending thread chooses to suspend and wait, then when the mail in the mailbox is collected and space is freed up, the waiting and suspended sending thread will be awakened to continue sending.

When a thread receives mail from a mailbox, if the mailbox is empty, the receiving thread can choose whether to wait and suspend until a new mail is received and wake up, or set a timeout. When the set timeout is reached and the mailbox still does not receive mail, the thread that chooses to timeout will be woken up and return - RT_ETIMEOUT. If there is mail in the mailbox, the receiving thread will copy the 4-byte mail in the mailbox to the receiving buffer.

In RT-Thread, the mailbox control block is a data structure used by the operating system to manage mailboxes, represented by the structure struct rt_mailbox. Another C expression, rt_mailbox_t, represents the handle of the mailbox, which is implemented in C language as a pointer to the mailbox control block. For the detailed definition of the mailbox control block structure, please see the following code:

struct rt_mailbox
{
    struct rt_ipc_object parent;

    rt_uint32_t* msg_pool;                /* 邮箱缓冲区的开始地址 */
    rt_uint16_t size;                     /* 邮箱缓冲区的大小     */

    rt_uint16_t entry;                    /* 邮箱中邮件的数目     */
    rt_uint16_t in_offset, out_offset;    /* 邮箱缓冲的进出指针   */
    rt_list_t suspend_sender_thread;      /* 发送线程的挂起等待队列 */
};
typedef struct rt_mailbox* rt_mailbox_t;copymistakeCopy Success

The rt_mailbox object is derived from rt_ipc_object and is managed by the IPC container.

The mailbox control block is a structure that contains important parameters related to events and plays an important role in the implementation of mailbox functions. The mailbox-related interface is shown in the figure below. The operations on a mailbox include: creating/initializing a mailbox, sending mail, receiving mail, and deleting/leaving a mailbox.

Creating and Deleting Mailboxes

To dynamically create a mailbox object, you can call the following function interface:

rt_mailbox_t rt_mb_create (const char* name, rt_size_t size, rt_uint8_t flag);copymistakeCopy Success

When creating a mailbox object, a mailbox object will be allocated from the object manager first, and then a memory space will be dynamically allocated to the mailbox to store mails. The size of this memory is equal to the product of the mail size (4 bytes) and the capacity of the mailbox. Then the number of received mails and the offset of the sent mails in the mailbox are initialized. The following table describes the input parameters and return values ​​of this function:

Note

Note: RT_IPC_FLAG_FIFO is a non-real-time scheduling method. Unless the application is very concerned about first-come-first-served, and you clearly understand that all threads involving the mailbox will become non-real-time threads, you can use RT_IPC_FLAG_FIFO. Otherwise, it is recommended to use RT_IPC_FLAG_PRIO, that is, to ensure the real-time nature of the thread.

When a mailbox created with rt_mb_create() is no longer in use, it should be deleted to free up the corresponding system resources. Once the operation is completed, the mailbox will be permanently deleted. The function interface for deleting a mailbox is as follows:

rt_err_t rt_mb_delete (rt_mailbox_t mb);copymistakeCopy Success

When deleting a mailbox, if a thread is suspended on the mailbox object, the kernel first wakes up all threads suspended on the mailbox (the thread return value is -RT_ERROR), then releases the memory used by the mailbox, and finally deletes the mailbox object. The following table describes the input parameters and return values ​​of this function:

Initializing and uninitializing a mailbox

Initializing a mailbox is similar to creating a mailbox, except that it is used to initialize static mailbox objects. Unlike creating a mailbox, the memory of a static mailbox object is allocated by the compiler when the system is compiled, and is generally placed in the read-write data segment or the uninitialized data segment. The rest of the initialization work is the same as when creating a mailbox. The function interface is as follows:

  rt_err_t rt_mb_init(rt_mailbox_t mb,
                    const char* name,
                    void* msgpool,
                    rt_size_t size,
                    rt_uint8_t flag)copymistakeCopy Success

When initializing a mailbox, the function interface needs to obtain the mailbox object control block that the user has applied for, the pointer to the buffer, the mailbox name, and the mailbox capacity (the number of emails that can be stored). The following table describes the input parameters and return values ​​of the function:

The size parameter here specifies the capacity of the mailbox, that is, if the number of bytes of the buffer pointed to by msgpool is N, then the capacity of the mailbox should be N/4.

Detaching a mailbox will detach the statically initialized mailbox object from the kernel object manager. Detaching a mailbox uses the following interface:

rt_err_t rt_mb_detach(rt_mailbox_t mb);copymistakeCopy Success

After using this function interface, the kernel first wakes up all threads hanging on the mailbox (the thread gets the return value - RT_ERROR), and then detaches the mailbox object from the kernel object manager. The following table describes the input parameters and return values ​​of this function:

Send Email

A thread or interrupt service program can send emails to other threads through the mailbox. The email sending function interface is as follows:

rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value);copymistakeCopy Success

The mail sent can be 32-bit data in any format, an integer value or a pointer to a buffer. When the mailbox is full, the thread or interrupt program that sends the mail will receive a return value of -RT_EFULL. The following table describes the input parameters and return values ​​of this function:

Waiting mode to send mail

Users can also send emails to the specified mailbox through the following function interface:

rt_err_t rt_mb_send_wait (rt_mailbox_t mb,
                      rt_uint32_t value,
                      rt_int32_t timeout);copymistakeCopy Success

The difference between rt_mb_send_wait() and rt_mb_send() is that there is a waiting time. If the mailbox is full, the sending thread will wait for the mailbox to be freed up due to receiving mail according to the set timeout parameter. If the set timeout period is reached and there is still no free space, the sending thread will be awakened and return an error code. The following table describes the input parameters and return values ​​of this function:

Send urgent email

The process of sending an urgent email is almost the same as sending an email. The only difference is that when sending an urgent email, the email is directly inserted into the queue and placed at the head of the email queue. In this way, the recipient can receive the urgent email first and process it in time. The function interface for sending an urgent email is as follows:

rt_err_t rt_mb_urgent (rt_mailbox_t mb, rt_ubase_t value);copymistakeCopy Success

The following table describes the input parameters and return values ​​of this function:

incoming mail

Only when there is mail in the mailbox, the receiver can get the mail immediately and return the return value of RT_EOK. Otherwise, the receiving thread will be suspended in the waiting thread queue of the mailbox or return directly according to the timeout setting. The receiving mail function interface is as follows:

rt_err_t rt_mb_recv (rt_mailbox_t mb, rt_uint32_t* value, rt_int32_t timeout);copymistakeCopy Success

When receiving mail, the recipient needs to specify the mailbox handle for receiving mail, the storage location of the received mail, and the maximum waiting time. If the timeout is set when receiving, if the mail is still not received within the specified time, - RT_ETIMEOUT will be returned. The following table describes the input parameters and return values ​​of this function:

This is a mailbox application routine, which initializes two static threads and a static mailbox object. One thread sends mail to the mailbox, and the other thread receives mail from the mailbox. The code is as follows:

Mailbox usage routine

Note: RT-Thread 5.0 and later versions have ALIGNchanged the keyword to rt_align, so please pay attention to the modification when using it.

#include <rtthread.h>

#define THREAD_PRIORITY      10
#define THREAD_TIMESLICE     5

/* 邮箱控制块 */
static struct rt_mailbox mb;
/* 用于放邮件的内存池 */
static char mb_pool[128];

static char mb_str1[] = "I'm a mail!";
static char mb_str2[] = "this is another mail!";
static char mb_str3[] = "over";

ALIGN(RT_ALIGN_SIZE)
static char thread1_stack[1024];
static struct rt_thread thread1;

/* 线程 1 入口 */
static void thread1_entry(void *parameter)
{
    char *str;

    while (1)
    {
        rt_kprintf("thread1: try to recv a mail\n");

        /* 从邮箱中收取邮件 */
        if (rt_mb_recv(&mb, (rt_uint32_t *)&str, RT_WAITING_FOREVER) == RT_EOK)
        {
            rt_kprintf("thread1: get a mail from mailbox, the content:%s\n", str);
            if (str == mb_str3)
                break;

            /* 延时 100ms */
            rt_thread_mdelay(100);
        }
    }
    /* 执行邮箱对象脱离 */
    rt_mb_detach(&mb);
}

ALIGN(RT_ALIGN_SIZE)
static char thread2_stack[1024];
static struct rt_thread thread2;

/* 线程 2 入口 */
static void thread2_entry(void *parameter)
{
    rt_uint8_t count;

    count = 0;
    while (count < 10)
    {
        count ++;
        if (count & 0x1)
        {
            /* 发送 mb_str1 地址到邮箱中 */
            rt_mb_send(&mb, (rt_uint32_t)&mb_str1);
        }
        else
        {
            /* 发送 mb_str2 地址到邮箱中 */
            rt_mb_send(&mb, (rt_uint32_t)&mb_str2);
        }

        /* 延时 200ms */
        rt_thread_mdelay(200);
    }

    /* 发送邮件告诉线程 1,线程 2 已经运行结束 */
    rt_mb_send(&mb, (rt_uint32_t)&mb_str3);
}

int mailbox_sample(void)
{
    rt_err_t result;

    /* 初始化一个 mailbox */
    result = rt_mb_init(&mb,
                        "mbt",                      /* 名称是 mbt */
                        &mb_pool[0],                /* 邮箱用到的内存池是 mb_pool */
                        sizeof(mb_pool) / 4,        /* 邮箱中的邮件数目,因为一封邮件占 4 字节 */
                        RT_IPC_FLAG_FIFO);          /* 采用 FIFO 方式进行线程等待 */
    if (result != RT_EOK)
    {
        rt_kprintf("init mailbox failed.\n");
        return -1;
    }

    rt_thread_init(&thread1,
                   "thread1",
                   thread1_entry,
                   RT_NULL,
                   &thread1_stack[0],
                   sizeof(thread1_stack),
                   THREAD_PRIORITY, THREAD_TIMESLICE);
    rt_thread_startup(&thread1);

    rt_thread_init(&thread2,
                   "thread2",
                   thread2_entry,
                   RT_NULL,
                   &thread2_stack[0],
                   sizeof(thread2_stack),
                   THREAD_PRIORITY, THREAD_TIMESLICE);
    rt_thread_startup(&thread2);
    return 0;
}

/* 导出到 msh 命令列表中 */
MSH_CMD_EXPORT(mailbox_sample, mailbox sample);copymistakeCopy Success

The simulation results are as follows:

 \ | /
- RT -     Thread Operating System
 / | \     3.1.0 build Aug 27 2018
 2006 - 2018 Copyright by rt-thread team
msh >mailbox_sample
thread1: try to recv a mail
thread1: get a mail from mailbox, the content:I'm a mail!
msh >thread1: try to recv a mail
thread1: get a mail from mailbox, the content:this is another mail!

thread1: try to recv a mail
thread1: get a mail from mailbox, the content:this is another mail!
thread1: try to recv a mail
thread1: get a mail from mailbox, the content:overcopymistakeCopy Success

The example demonstrates how to use the mailbox. Thread 2 sends mails, a total of 11 times; Thread 1 receives mails, a total of 11 mails are received, the mail content is printed out, and the process is judged to be finished.

Mailbox is a simple way to pass messages between threads, which is characterized by low overhead and high efficiency. In the implementation of RT-Thread operating system, one 4-byte email can be delivered at a time, and the mailbox has certain storage functions and can cache a certain number of emails (the number of emails is determined by the capacity specified when the mailbox is created and initialized). The maximum length of an email in the mailbox is 4 bytes, so the mailbox can be used to pass messages of no more than 4 bytes. Since 4 bytes of content on a 32-bit system can just hold a pointer, when a larger message needs to be passed between threads, a pointer to a buffer can be sent to the mailbox as an email, that is, the mailbox can also pass pointers, for example:

struct msg
{
    rt_uint8_t *data_ptr;
    rt_uint32_t data_size;
};copymistakeCopy Success

For such a message structure, which contains the pointer data_ptr pointing to the data and the variable data_size of the data block length, when a thread needs to send this message to another thread, it can use the following operations:

struct msg* msg_ptr;

msg_ptr = (struct msg*)rt_malloc(sizeof(struct msg));
msg_ptr->data_ptr = ...; /* 指向相应的数据块地址 */
msg_ptr->data_size = len; /* 数据块的长度 */
/* 发送这个消息指针给 mb 邮箱 */
rt_mb_send(mb, (rt_uint32_t)msg_ptr);copymistakeCopy Success

In the receiving thread, because what is received is a pointer, and msg_ptr is a newly allocated memory block, the corresponding memory block needs to be released after the receiving thread completes the processing:

struct msg* msg_ptr;
if (rt_mb_recv(mb, (rt_uint32_t*)&msg_ptr) == RT_EOK)
{
    /* 在接收线程处理完毕后,需要释放相应的内存块 */
    rt_free(msg_ptr);
}copymistakeCopy Success

Message queue is another common way of communication between threads, which is an extension of mailbox. It can be applied in many occasions: message exchange between threads, receiving variable-length data through serial port, etc.

The message queue can receive messages of unfixed length from threads or interrupt service routines and cache the messages in its own memory space. Other threads can also read corresponding messages from the message queue, and when the message queue is empty, the reading thread can be suspended. When a new message arrives, the suspended thread will be awakened to receive and process the message. The message queue is an asynchronous communication method.

As shown in the figure below, a thread or an interrupt service routine can put one or more messages into a message queue. Similarly, one or more threads can get messages from a message queue. When there are multiple messages sent to a message queue, the message that enters the message queue first is usually passed to the thread first. In other words, the thread gets the message that enters the message queue first, which is the first-in-first-out principle (FIFO).

The message queue object of the RT-Thread operating system consists of multiple elements. When a message queue is created, it is assigned a message queue control block: message queue name, memory buffer, message size, and queue length, etc. At the same time, each message queue object contains multiple message boxes, each of which can store a message; the first and last message boxes in the message queue are called the message list head and message list tail, respectively, corresponding to msg_queue_head and msg_queue_tail in the message queue control block; some message boxes may be empty, and they form a free message box list through msg_queue_free. The total number of message boxes in all message queues is the length of the message queue, which can be specified when the message queue is created.

In RT-Thread, the message queue control block is a data structure used by the operating system to manage the message queue, represented by the structure struct rt_messagequeue. Another C expression rt_mq_t represents the handle of the message queue, which is implemented in C language as a pointer to the message queue control block. For the detailed definition of the message queue control block structure, please see the following code:

struct rt_messagequeue
{
    struct rt_ipc_object parent;

    void* msg_pool;                     /* 指向存放消息的缓冲区的指针 */

    rt_uint16_t msg_size;               /* 每个消息的长度 */
    rt_uint16_t max_msgs;               /* 最大能够容纳的消息数 */

    rt_uint16_t entry;                  /* 队列中已有的消息数 */

    void* msg_queue_head;               /* 消息链表头 */
    void* msg_queue_tail;               /* 消息链表尾 */
    void* msg_queue_free;               /* 空闲消息链表 */

    rt_list_t suspend_sender_thread;    /* 发送线程的挂起等待队列 */
};
typedef struct rt_messagequeue* rt_mq_t;copymistakeCopy Success

The rt_messagequeue object is derived from rt_ipc_object and is managed by the IPC container.

The message queue control block is a structure that contains important parameters related to the message queue and plays an important role in the function implementation of the message queue. The relevant interface of the message queue is shown in the figure below. The operations on a message queue include: creating a message queue - sending messages - receiving messages - deleting a message queue.

Creating and deleting message queues

Before using a message queue, it should be created or an existing static message queue object should be initialized. The function interface for creating a message queue is as follows:

rt_mq_t rt_mq_create(const char* name, rt_size_t msg_size,
            rt_size_t max_msgs, rt_uint8_t flag);copymistakeCopy Success

When creating a message queue, first allocate a message queue object from the object manager, then allocate a piece of memory space to the message queue object and organize it into a free message list. The size of this memory = [message size + message header (for list connection) size] x the maximum number of message queues. Then initialize the message queue. At this time, the message queue is empty. The following table describes the input parameters and return values ​​of this function:

Note

Note: RT_IPC_FLAG_FIFO is a non-real-time scheduling method. Unless the application is very concerned about first-come-first-served, and you clearly understand that all threads involving the message queue will become non-real-time threads, you can use RT_IPC_FLAG_FIFO. Otherwise, it is recommended to use RT_IPC_FLAG_PRIO, that is, to ensure the real-time nature of the thread.

When a message queue is no longer in use, it should be deleted to free up system resources. Once the operation is completed, the message queue will be permanently deleted. The function interface for deleting a message queue is as follows:

rt_err_t rt_mq_delete(rt_mq_t mq);copymistakeCopy Success

When deleting a message queue, if there are threads suspended on the message queue waiting queue, the kernel first wakes up all threads suspended on the message waiting queue (the thread return value is - RT_ERROR), then releases the memory used by the message queue, and finally deletes the message queue object. The following table describes the input parameters and return values ​​of this function:

Initialize and leave the message queue

Initializing a static message queue object is similar to creating a message queue object, except that the memory of a static message queue object is allocated by the compiler when the system is compiled, and is generally placed in the read data segment or the uninitialized data segment. Before using this type of static message queue object, it needs to be initialized. The function interface for initializing a message queue object is as follows:

rt_err_t rt_mq_init(rt_mq_t mq, const char* name,
                        void *msgpool, rt_size_t msg_size,
                        rt_size_t pool_size, rt_uint8_t flag);copymistakeCopy Success

When initializing the message queue, the interface requires the handle of the message queue object (i.e., the pointer to the message queue object control block), message queue name, message buffer pointer, message size, and message queue buffer size that the user has applied for. After the message queue is initialized, all messages are hung on the free message list, and the message queue is empty. The following table describes the input parameters and return values ​​of this function:

Detaching a message queue will cause the message queue object to be detached from the kernel object manager. Detaching a message queue uses the following interface:

rt_err_t rt_mq_detach(rt_mq_t mq);copymistakeCopy Success

After using this function interface, the kernel first wakes up all threads hanging on the message waiting queue object (the thread return value is -RT_ERROR), and then detaches the message queue object from the kernel object manager. The following table describes the input parameters and return values ​​of this function:

Send Message

A thread or an interrupt service program can send messages to a message queue. When sending a message, the message queue object first takes a free message block from the free message list, copies the message content sent by the thread or interrupt service program to the message block, and then hangs the message block at the end of the message queue. The sender can successfully send a message if and only if there are available free message blocks on the free message list; when there are no available message blocks on the free message list, it means that the message queue is full. At this time, the thread or interrupt program sending the message will receive an error code (-RT_EFULL). The function interface for sending a message is as follows:

rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size);copymistakeCopy Success

When sending a message, the sender needs to specify the object handle of the message queue to be sent (i.e., the pointer to the message queue control block), and specify the message content and message size to be sent. After sending a normal message, the head message on the free message list is transferred to the end of the message queue. The following table describes the input parameters and return values ​​of this function:

Waiting mode to send a message

Users can also send messages to the specified message queue through the following function interface:

rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout);copymistakeCopy Success

The difference between rt_mq_send_wait() and rt_mq_send() is that there is a waiting time. If the message queue is full, the sending thread will wait according to the set timeout parameter. If there is still no space after the set timeout, the sending thread will be awakened and return an error code. The following table describes the input parameters and return values ​​of this function:

Send urgent message

The process of sending an emergency message is almost the same as sending a message. The only difference is that when sending an emergency message, the message block taken from the idle message list is not hung at the end of the message queue, but at the head of the queue. In this way, the receiver can receive the emergency message first and process the message in time. The function interface for sending an emergency message is as follows:

rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size);copymistakeCopy Success

The following table describes the input parameters and return values ​​of this function:

Receiving Messages

The receiver can only receive messages when there are messages in the message queue. Otherwise, the receiver will be suspended on the waiting thread queue of the message queue or return directly according to the timeout setting. The message receiving function interface is as follows:

rt_ssize_t rt_mq_recv (rt_mq_t mq, void* buffer,
                    rt_size_t size, rt_int32_t timeout);copymistakeCopy Success

When receiving a message, the receiver needs to specify the handle of the message queue object where the message is stored, and specify a memory buffer where the received message content will be copied. In addition, the timeout period for when the message cannot be retrieved in time must be specified. As shown in the figure below, after receiving a message, the head message on the message queue is transferred to the end of the free message list. The following table describes the input parameters and return values ​​of this function:

This is an application routine of a message queue. Two static threads are initialized in the routine. One thread will receive messages from the message queue; the other thread will send ordinary messages and emergency messages to the message queue at regular intervals, as shown in the following code:

Message queue usage routine

Note: RT-Thread 5.0 and later versions have ALIGNchanged the keyword to rt_align, so please pay attention to the modification when using it.

#include <rtthread.h>

/* 消息队列控制块 */
static struct rt_messagequeue mq;
/* 消息队列中用到的放置消息的内存池 */
static rt_uint8_t msg_pool[2048];

ALIGN(RT_ALIGN_SIZE)
static char thread1_stack[1024];
static struct rt_thread thread1;
/* 线程 1 入口函数 */
static void thread1_entry(void *parameter)
{
    char buf = 0;
    rt_uint8_t cnt = 0;

    while (1)
    {
        /* 从消息队列中接收消息 */
        if (rt_mq_recv(&mq, &buf, sizeof(buf), RT_WAITING_FOREVER) > 0)
        {
            rt_kprintf("thread1: recv msg from msg queue, the content:%c\n", buf);
            if (cnt == 19)
            {
                break;
            }
        }
        /* 延时 50ms */
        cnt++;
        rt_thread_mdelay(50);
    }
    rt_kprintf("thread1: detach mq \n");
    rt_mq_detach(&mq);
}

ALIGN(RT_ALIGN_SIZE)
static char thread2_stack[1024];
static struct rt_thread thread2;
/* 线程 2 入口 */
static void thread2_entry(void *parameter)
{
    int result;
    char buf = 'A';
    rt_uint8_t cnt = 0;

    while (1)
    {
        if (cnt == 8)
        {
            /* 发送紧急消息到消息队列中 */
            result = rt_mq_urgent(&mq, &buf, 1);
            if (result != RT_EOK)
            {
                rt_kprintf("rt_mq_urgent ERR\n");
            }
            else
            {
                rt_kprintf("thread2: send urgent message - %c\n", buf);
            }
        }
        else if (cnt>= 20)/* 发送 20 次消息之后退出 */
        {
            rt_kprintf("message queue stop send, thread2 quit\n");
            break;
        }
        else
        {
            /* 发送消息到消息队列中 */
            result = rt_mq_send(&mq, &buf, 1);
            if (result != RT_EOK)
            {
                rt_kprintf("rt_mq_send ERR\n");
            }

            rt_kprintf("thread2: send message - %c\n", buf);
        }
        buf++;
        cnt++;
        /* 延时 5ms */
        rt_thread_mdelay(5);
    }
}

/* 消息队列示例的初始化 */
int msgq_sample(void)
{
    rt_err_t result;

    /* 初始化消息队列 */
    result = rt_mq_init(&mq,
                        "mqt",
                        &msg_pool[0],             /* 内存池指向 msg_pool */
                        1,                          /* 每个消息的大小是 1 字节 */
                        sizeof(msg_pool),        /* 内存池的大小是 msg_pool 的大小 */
                        RT_IPC_FLAG_PRIO);       /* 如果有多个线程等待,优先级大小的方法分配消息 */

    if (result != RT_EOK)
    {
        rt_kprintf("init message queue failed.\n");
        return -1;
    }

    rt_thread_init(&thread1,
                   "thread1",
                   thread1_entry,
                   RT_NULL,
                   &thread1_stack[0],
                   sizeof(thread1_stack), 25, 5);
    rt_thread_startup(&thread1);

    rt_thread_init(&thread2,
                   "thread2",
                   thread2_entry,
                   RT_NULL,
                   &thread2_stack[0],
                   sizeof(thread2_stack), 25, 5);
    rt_thread_startup(&thread2);

    return 0;
}

/* 导出到 msh 命令列表中 */
MSH_CMD_EXPORT(msgq_sample, msgq sample);copymistakeCopy Success

The simulation results are as follows:

\ | /
- RT -     Thread Operating System
 / | \     3.1.0 build Aug 24 2018
 2006 - 2018 Copyright by rt-thread team
msh > msgq_sample
msh >thread2: send message - A
thread1: recv msg from msg queue, the content:A
thread2: send message - B
thread2: send message - C
thread2: send message - D
thread2: send message - E
thread1: recv msg from msg queue, the content:B
thread2: send message - F
thread2: send message - G
thread2: send message - H
thread2: send urgent message - I
thread2: send message - J
thread1: recv msg from msg queue, the content:I
thread2: send message - K
thread2: send message - L
thread2: send message - M
thread2: send message - N
thread2: send message - O
thread1: recv msg from msg queue, the content:C
thread2: send message - P
thread2: send message - Q
thread2: send message - R
thread2: send message - S
thread2: send message - T
thread1: recv msg from msg queue, the content:D
message queue stop send, thread2 quit
thread1: recv msg from msg queue, the content:E
thread1: recv msg from msg queue, the content:F
thread1: recv msg from msg queue, the content:G

thread1: recv msg from msg queue, the content:T
thread1: detach mqcopymistakeCopy Success

The example demonstrates how to use the message queue. Thread 1 will receive messages from the message queue; Thread 2 will periodically send ordinary messages and urgent messages to the message queue. Since the message "I" sent by Thread 2 is an urgent message, it will be directly inserted into the head of the message queue. Therefore, after receiving message "B", Thread 1 will receive the urgent message and then receive message "C".

Message queues can be used to send messages of indefinite length, including message exchange between threads, and sending messages to threads in interrupt service routines (interrupt service routines cannot receive messages). The following is an introduction to the use of message queues in two parts: sending messages and synchronizing messages.

Send Message

The obvious difference between a message queue and a mailbox is that the length of a message is not limited to 4 bytes; in addition, a message queue also includes a function interface for sending urgent messages. However, when a message queue is created where the maximum length of all messages is 4 bytes, the message queue object will degenerate into a mailbox. This unlimited length message is also reflected in the code writing occasion, which is also similar to the mailbox code:

struct msg
{
    rt_uint8_t *data_ptr;    /* 数据块首地址 */
    rt_uint32_t data_size;   /* 数据块大小   */
};copymistakeCopy Success

The message structure definition is the same as the mailbox example, assuming that such a message still needs to be sent to the receiving thread. In the mailbox example, this structure can only send a pointer to this structure (after the function pointer is sent, the receiving thread can correctly access the content pointed to by this address, and usually this data needs to be left for the receiving thread to release). The way to use a message queue is very different:

void send_op(void *data, rt_size_t length)
{
    struct msg msg_ptr;

    msg_ptr.data_ptr = data;  /* 指向相应的数据块地址 */
    msg_ptr.data_size = length; /* 数据块的长度 */

    /* 发送这个消息指针给 mq 消息队列 */
    rt_mq_send(mq, (void*)&msg_ptr, sizeof(struct msg));
}copymistakeCopy Success

Note that in the above code, the data content of a local variable is sent to the message queue. In the receiving thread, the structure of the message receiving is also using a local variable:

void message_handler()
{
    struct msg msg_ptr; /* 用于放置消息的局部变量 */

    /* 从消息队列中接收消息到 msg_ptr 中 */
    if (rt_mq_recv(mq, (void*)&msg_ptr, sizeof(struct msg), RT_WAITING_FOREVER) > 0)
    {
        /* 成功接收到消息,进行相应的数据处理 */
    }
}copymistakeCopy Success

Because the message queue directly copies the data content, in the above examples, local variables are used to save the message structure, which avoids the trouble of dynamic memory allocation (there is no need to worry that the message memory space has been released when the receiving thread receives the message).

Synchronous Messages

In general system design, we often encounter the problem of sending synchronous messages. At this time, we can choose the corresponding implementation according to the different states at that time: the two threads can be implemented in the form of [message queue + semaphore or mailbox] . The sending thread sends the corresponding message to the message queue in the form of message sending. After sending, it hopes to obtain the confirmation of receipt from the receiving thread. The working diagram is shown in the following figure:

Depending on the message confirmation, the message structure can be defined as:

struct msg
{
    /* 消息结构其他成员 */
    struct rt_mailbox ack;
};
/* 或者 */
struct msg
{
    /* 消息结构其他成员 */
    struct rt_semaphore ack;
};copymistakeCopy Success

The first type of message uses a mailbox as a confirmation mark, while the second type of message uses a semaphore as a confirmation mark. The mailbox as a confirmation mark means that the receiving thread can notify the sending thread of some status values; while the semaphore as a confirmation mark can only notify the sending thread that the message has been confirmed to be received.

Signals (also called soft interrupt signals) are a simulation of the interrupt mechanism at the software level. In principle, a thread receiving a signal is similar to a processor receiving an interrupt request.

Signals are used for asynchronous communication in RT-Thread. The POSIX standard defines the sigset_t type to define a signal set. However, the sigset_t type may be defined differently in different systems. In RT-Thread, sigset_t is defined as an unsigned long type and named rt_sigset_t. The signals that applications can use are SIGUSR1 (10) and SIGUSR2 (12).

The essence of the signal is a soft interrupt, which is used to notify the thread of asynchronous events and is used for exception notification and emergency handling between threads. A thread does not have to wait for the arrival of a signal through any operation. In fact, the thread does not know when the signal will arrive. Threads can send soft interrupt signals to each other by calling rt_thread_kill().

The threads that receive the signals have different processing methods for various signals, which can be divided into three categories:

The first is an interrupt-like handler. For signals that need to be processed, the thread can specify a processing function, which will be used to handle the signal.

The second method is to ignore a signal and do nothing about it, as if it had never happened.

The third method is to retain the system default value for processing the signal.

As shown in the figure below, assuming that thread 1 needs to process a signal, thread 1 first installs a signal and unblocks it, and sets the exception handling method for the signal while installing it; then other threads can send signals to thread 1, triggering thread 1 to process the signal.

When the signal is passed to thread 1, if it is in the suspended state, it will change its state to the ready state to handle the corresponding signal. If it is in the running state, a new stack frame space will be created based on its current thread stack to handle the corresponding signal. It should be noted that the thread stack size used will also increase accordingly.

There are several types of signal operations: installing signals, blocking signals, unblocking signals, sending signals, and waiting for signals. The signal interface is shown in the figure below:

Install signal

If a thread wants to process a signal, it must install the signal in the thread. Installing a signal is mainly used to determine the mapping relationship between the signal value and the thread's action for the signal value, that is, which signal the thread will process and what action will be performed when the signal is passed to the thread. For detailed definition, please see the following code:

rt_sighandler_t rt_signal_install(int signo, rt_sighandler_t[] handler);copymistakeCopy Success

rt_sighandler_t is the function pointer type that defines the signal processing function. The following table describes the input parameters and return values ​​of the function:

Setting the handler parameter when installing a signal determines the different processing methods for the signal. The processing methods can be divided into three types:

1) Similar to the interrupt processing method, the parameter points to the user-defined processing function when the signal occurs, which is processed by this function.

2) The parameter is set to SIG_IGN to ignore a signal and do nothing with it, as if it had never happened.

3) If the parameter is set to SIG_DFL, the system will call the default processing function _signal_default_handler().

Blocking Signals

Signal blocking can also be understood as masking signals. If the signal is blocked, it will not be delivered to the thread that installed the signal, nor will it trigger soft interrupt processing. Calling rt_signal_mask() can block the signal:

void rt_signal_mask(int signo);copymistakeCopy Success

The following table describes the input parameters of this function:

Unblocking signals

Several signals can be installed in a thread. This function can be used to pay attention to some of the signals, so sending these signals will trigger a soft interrupt for the thread. Calling rt_signal_unmask() can be used to unblock the signal:

void rt_signal_unmask(int signo);copymistakeCopy Success

The following table describes the input parameters of this function:

rt_signal_unmask() Function Parameters

Sending a signal

When exception handling is required, a signal can be sent to the thread that is set to handle the exception. Calling rt_thread_kill() can be used to send a signal to any thread:

int rt_thread_kill(rt_thread_t tid, int sig);copymistakeCopy Success

The following table describes the input parameters and return values ​​of this function:

Waiting for the signal

Wait for the set signal to arrive. If the signal is not received, the thread will be suspended until the signal is received or the waiting time exceeds the specified timeout. If the signal is received, the pointer to the signal body is stored in si. The following is the function of waiting for the signal.

int rt_signal_wait(const rt_sigset_t *set,
                        rt_siginfo_t[] *si, rt_int32_t timeout);copymistakeCopy Success

rt_siginfo_t is the data type that defines the signal information. The following table describes the input parameters and return values ​​of the function:

This is a signal application routine, as shown in the following code. This routine creates a thread. When installing the signal, the signal processing method is set to custom processing, and the signal processing function defined is thread1_signal_handler(). After this thread is running and the signal is installed, send a signal to this thread. This thread will receive the signal and print information.

Signal usage routine

#include <rtthread.h>

#define THREAD_PRIORITY         25
#define THREAD_STACK_SIZE       512
#define THREAD_TIMESLICE        5

static rt_thread_t tid1 = RT_NULL;

/* 线程 1 的信号处理函数 */
void thread1_signal_handler(int sig)
{
    rt_kprintf("thread1 received signal %d\n", sig);
}

/* 线程 1 的入口函数 */
static void thread1_entry(void *parameter)
{
    int cnt = 0;

    /* 安装信号 */
    rt_signal_install(SIGUSR1, thread1_signal_handler);
    rt_signal_unmask(SIGUSR1);

    /* 运行 10 次 */
    while (cnt < 10)
    {
        /* 线程 1 采用低优先级运行,一直打印计数值 */
        rt_kprintf("thread1 count : %d\n", cnt);

        cnt++;
        rt_thread_mdelay(100);
    }
}

/* 信号示例的初始化 */
int signal_sample(void)
{
    /* 创建线程 1 */
    tid1 = rt_thread_create("thread1",
                            thread1_entry, RT_NULL,
                            THREAD_STACK_SIZE,
                            THREAD_PRIORITY, THREAD_TIMESLICE);

    if (tid1 != RT_NULL)
        rt_thread_startup(tid1);

    rt_thread_mdelay(300);

    /* 发送信号 SIGUSR1 给线程 1 */
    rt_thread_kill(tid1, SIGUSR1);

    return 0;
}

/* 导出到 msh 命令列表中 */
MSH_CMD_EXPORT(signal_sample, signal sample);copymistakeCopy Success

The simulation results are as follows:

 \ | /
- RT -     Thread Operating System
 / | \     3.1.0 build Aug 24 2018
 2006 - 2018 Copyright by rt-thread team
msh >signal_sample
thread1 count : 0
thread1 count : 1
thread1 count : 2
msh >thread1 received signal 10
thread1 count : 3
thread1 count : 4
thread1 count : 5
thread1 count : 6
thread1 count : 7
thread1 count : 8
thread1 count : 9copymistakeCopy Success

In the routine, first the thread installs the signal and unblocks it, then sends the signal to the thread. The thread receives the signal and prints the received signal: SIGUSR1 (10).

Last updated

Assoc. Prof. Wiroon Sriborrirux, Founder of Advance Innovation Center (AIC) and Bangsaen Design House (BDH), Electrical Engineering Department, Faculty of Engineering, Burapha University