IPC Message Queues

Message queue is another commonly used inter-thread communication method. It can receive messages of variable length from threads or interrupt service routines and cache the messages in its own memory space. Other threads can also read corresponding messages from the message queue, and when the message queue is empty, the reading thread can be suspended. When a new message arrives, the suspended thread will be awakened to receive and process the message.

The main operations of message queues include: creating or opening through the mq_open() function, calling mq_send() to send a message to the message queue, calling mq_receive() to get a message from the message queue, and calling mq_unlink() to delete the message queue when it is not in use.

The POSIX message queue is mainly used for inter-process communication. The POSIX message queue of the RT-Thread operating system is mainly a package based on the RT-Thread kernel message queue, and is mainly used for communication between threads in the system. The usage is similar to the RT-Thread kernel message queue.

Each message queue corresponds to a message queue control block. Before creating a message queue, you need to define a message queue control block. The message queue control block is defined in the mqueue.h header file.

struct mqdes
{
    rt_uint16_t refcount;  /* 引用计数 */
    rt_uint16_t unlinked;  /* 消息队列的分离状态,值为 1 表示消息队列已经分离 */
    rt_mq_t mq;        /* RT-Thread 消息队列控制块 */
    struct mqdes* next;    /* 指向下一个消息队列控制块 */
};
typedef struct mqdes* mqd_t;  /* 消息队列控制块指针类型重定义 */copymistakeCopy Success

mqd_t mq_open(const char *name, int oflag, ...);copymistakeCopy Success

This function creates a new message queue or opens an existing message queue according to the name of the message queue. The optional values ​​of Oflag are 0, O_CREAT or O_CREAT|O_EXCL. If Oflag is set to O_CREAT, a new message queue will be created. If Oflag is set to O_CREAT|O_EXCL, NULL will be returned if the message queue already exists, and a new message queue will be created if it does not exist. If Oflag is set to 0, NULL will be returned if the message queue does not exist.

int mq_unlink(const char *name);copymistakeCopy Success

This function searches for a message queue based on the message queue name name. If found, the message queue is set to a detached state. If the hold count is 0, the message queue is deleted and the resources occupied by the message queue are released.

int mq_close(mqd_t mqdes);copymistakeCopy Success

When a thread terminates, the shutdown operation is performed on the message queue it occupies. This shutdown operation is performed regardless of whether the thread terminates voluntarily or involuntarily, which is equivalent to reducing the hold count of the message queue by 1. If the hold count is 0 after reduction by 1 and the message queue is in a detached state, the mqdes message queue will be deleted and the resources it occupies will be released.

int mq_send(mqd_t mqdes,
            const char *msg_ptr,
            size_t msg_len,
            unsigned msg_prio);copymistakeCopy Success

This function is used to send a message to the mqdes message queue and is the encapsulation of the rt_mq_send() function. This function adds the message pointed to by msg_ptr to the mqdes message queue. The length of the message sent, msg_len, must be less than or equal to the maximum message length set when the message queue is created.

If the message queue is full, that is, the number of messages in the message queue is equal to the maximum number of messages, the thread or interrupt program that sends the message will receive an error code (-RT_EFULL).

int mq_timedsend(mqd_t mqdes,
                const char *msg_ptr,
                size_t msg_len,
                unsigned msg_prio,
                const struct timespec *abs_timeout);copymistakeCopy Success

Currently RT-Thread does not support sending messages with a specified blocking time, but the function interface has been implemented, which is equivalent to calling mq_send().

ssize_t mq_receive(mqd_t mqdes,
                  char *msg_ptr,
                  size_t msg_len,
                  unsigned *msg_prio);copymistakeCopy Success

This function removes the oldest message in the mqdes message queue from the message queue and puts the message into the memory pointed to by msg_ptr. If the message queue is empty, the thread calling the mq_receive() function will be blocked until a message is available in the message queue.

ssize_t mq_timedreceive(mqd_t mqdes,
                        char *msg_ptr,
                        size_t msg_len,
                        unsigned *msg_prio,
                        const struct timespec *abs_timeout);copymistakeCopy Success

The difference between this function and the mq_receive() function is that if the message queue is empty, the thread will be blocked for the abs_timeout duration. After the timeout, the function directly returns - 1, and the thread will be awakened from the blocked state to the ready state.

This program will create three threads, thread 1 receives messages from the message queue, and thread 2 and thread 3 send messages to the message queue.

#include <mqueue.h>
#include <stdio.h>

/* 线程控制块 */
static pthread_t tid1;
static pthread_t tid2;
static pthread_t tid3;
/* 消息队列句柄 */
static mqd_t mqueue;

/* 函数返回值检查函数 */
static void check_result(char* str,int result)
{
    if (0 == result)
    {
            printf("%s successfully!\n",str);
    }
    else
    {
            printf("%s failed! error code is %d\n",str,result);
    }
}
/* 线程 1 入口函数 */
static void* thread1_entry(void* parameter)
{
    char buf[128];
    int result;

    while (1)
    {
        /* 从消息队列中接收消息 */
        result = mq_receive(mqueue, &buf[0], sizeof(buf), 0);
        if (result != -1)
        {
            /* 输出内容 */
            printf("thread1 recv [%s]\n", buf);
        }

        /* 休眠 1 秒 */
       // sleep(1);
    }
}
/* 线程 2 入口函数 */
static void* thread2_entry(void* parameter)
{
    int i, result;
    char buf[] = "message2 No.x";

    while (1)
    {
       for (i = 0; i < 10; i++)
        {
            buf[sizeof(buf) - 2] = '0' + i;

            printf("thread2 send [%s]\n", buf);
            /* 发送消息到消息队列中 */
            result = mq_send(mqueue, &buf[0], sizeof(buf), 0);
            if (result == -1)
            {
                /* 消息队列满, 延迟 1s 时间 */
                printf("thread2:message queue is full, delay 1s\n");
                sleep(1);
            }
        }

        /* 休眠 2 秒 */
        sleep(2);
    }
}
/* 线程 3 入口函数 */
static void* thread3_entry(void* parameter)
{
    int i, result;
    char buf[] = "message3 No.x";

    while (1)
    {
       for (i = 0; i < 10; i++)
        {
            buf[sizeof(buf) - 2] = '0' + i;

            printf("thread3 send [%s]\n", buf);
            /* 发送消息到消息队列中 */
            result = mq_send(mqueue, &buf[0], sizeof(buf), 0);
            if (result == -1)
            {
                /* 消息队列满, 延迟 1s 时间 */
                printf("thread3:message queue is full, delay 1s\n");
                sleep(1);
            }
        }

        /* 休眠 2 秒 */
        sleep(2);
    }
}
/* 用户应用入口 */
int rt_application_init()
{
    int result;
    struct mq_attr mqstat;
    int oflag = O_CREAT|O_RDWR;
#define MSG_SIZE    128
#define MAX_MSG        128
    memset(&mqstat, 0, sizeof(mqstat));
    mqstat.mq_maxmsg = MAX_MSG;
    mqstat.mq_msgsize = MSG_SIZE;
    mqstat.mq_flags = 0;
    mqueue = mq_open("mqueue1",O_CREAT,0777,&mqstat);

    /* 创建线程 1, 线程入口是 thread1_entry, 属性参数设为 NULL 选择默认值,入口参数为 NULL*/
    result = pthread_create(&tid1,NULL,thread1_entry,NULL);
    check_result("thread1 created",result);

    /* 创建线程 2, 线程入口是 thread2_entry, 属性参数设为 NULL 选择默认值,入口参数为 NULL*/
    result = pthread_create(&tid2,NULL,thread2_entry,NULL);
    check_result("thread2 created",result);

    /* 创建线程 3, 线程入口是 thread3_entry, 属性参数设为 NULL 选择默认值,入口参数为 NULL*/
    result = pthread_create(&tid3,NULL,thread3_entry,NULL);
    check_result("thread3 created",result);


    return 0;
}copymistakeCopy Success

Last updated

Assoc. Prof. Wiroon Sriborrirux, Founder of Advance Innovation Center (AIC) and Bangsaen Design House (BDH), Electrical Engineering Department, Faculty of Engineering, Burapha University