POSIX message queues in Linux

  • Post author:
  • Post last modified:August 14, 2024
  • Reading time:16 mins read

POSIX Message queues

POSIX interprocess comunication (IPC) was introduced in the POSIX.1b standard (IEEE Std 1003.1b-1993) for real time extensions. POSIX message queues have been made available in Linux since the version 2.6.6 (May 2004). POSIX IPC calls are as per the standard but may not be available on older Unix-like systems. Compared with the System V IPC calls, the POSIX IPC calls have a cleaner interface and are easier to use.

1.0 POSIX Message queue naming in Linux

System V message queues are identified using keys obtained with the ftok function call. POSIX message queues are identified using name strings. On Linux, POSIX queues are named as string starting with a forward slash (/) followed by one or more characters, none of which is a slash and ending with the null character. Any process knowing the queue name and having appropriate permissions can send or receive messages from the queue and also do other operations on it.

2.0 POSIX Message queue calls

Programs using POSIX message queues on Linux must link with the real-time library, librt using the compiler option -lrt. The function call names start with the prefix, mq_.

2.1 mq_open, mq_close

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

mqd_t mq_open (const char *name, int oflag);
mqd_t mq_open (const char *name, int oflag, mode_t mode,
               struct mq_attr *attr);

The mq_open function is for opening a POSIX queue. The first parameter specifies the queue name which is as described in an earlier paragraph above. The second parameter is a flag which can be O_RDONLY for receiving messages, O_WRONLY, for sending messages and O_RDWR for both send and receive operations on the queue. More values can be OR’ed to this flag. You can specify O_NONBLOCK to use the queue in a non-blocking mode. By default, mq_send would block if the queue was full and mq_receive would block if there was no message in the queue. But if O_NONBLOCK is specified in oflag, the call would return in those cases immediately with errno set to EAGAIN.

If you specify O_CREAT as a part of oflag, the queue is created, if it does not, already, exist. If you specify O_EXCL along with O_CREAT, and the queue exists, mq_open fails with errno set to EEXIST. If O_CREAT is specified in oflag, the second form of mq_open has to be used with two additional parameters. In that case, mode specifies permissions for the queue and the pointer to struct mq_attr gives the attributes for the message queue. If this pointer is NULL, a queue with default attributes is created.

struct mq_attr {
    long mq_flags;       /* Flags: 0 or O_NONBLOCK */
    long mq_maxmsg;      /* Max. # of messages on queue */
    long mq_msgsize;     /* Max. message size (bytes) */ 
    long mq_curmsgs;     /* # of messages currently in queue */
};         

The value of mq_maxmsg in the structure whose pointer is passed as attr in mq_open should be less than or equal to that in the /proc interface file, /proc/sys/fs/mqueue/msg_max (default value in the file is 10). Similarly the value of mq_msgsize should be less than that in the file, /proc/sys/fs/mqueue/msgsize_max, the default value in the file being 8192 bytes. These limits are ignored for privileged processes.

If the mq_open call is successful, a message queue descriptor is returned. The message queue descriptor can be used in subsequent calls for the queue.

The mq_close calls is,

#include <mqueue.h>

int mq_close (mqd_t mqdes);

The mq_close call closes the message queue descriptor, mqdes.

2.2 mq_timed_send, mq_send, mq_timed_receive, mq_receive

#include <mqueue.h>

int mq_send (mqd_t mqdes, const char *msg_ptr, size_t msg_len, 
             unsigned int msg_prio);

mq_send is for sending a message to the queue referred by the descriptor mqdes. The msg_ptr points to the message buffer. msg_len is the size of the message, which should be less than or equal to the message size for the queue. msg_prio is the message priority, which is a non-negative number specifying the priority of the message. Messages are placed in the queue in the decreasing order of message priority, with the older messages for a priority coming before newer messages. If the queue is full, mq_send blocks till there is space on the queue, unless the O_NONBLOCK flag is enabled for the message queue, in which case mq_send returns immediately with errno set to EAGAIN.

#include <time.h>
#include <mqueue.h>

int mq_timedsend (mqd_t mqdes, const char *msg_ptr, size_t msg_len, 
                  unsigned int msg_prio, const struct timespec *abs_timeout);

mq_timedsend works just like mq_send, except that if the queue is full and O_NONBLOCK flag is not specified, a time out occurs at the time pointed by abs_timeout and mq_timedsend returns. It is worth noting that time parameter is absolute time in seconds and nanoseconds since the Epoch, January 1, 1970, 00:00:00 +0000 UTC. Also, if the queue is full and time specified has already passed, mq_timedsend returns immediately. The structure for specifying the timeout is,

struct timespec {
    time_t tv_sec;        /* seconds */ 
    long   tv_nsec;       /* nanoseconds */
};         

Next, we have the mq_receive and mq_timedreceive calls for receiving messages.

#include <mqueue.h>

ssize_t mq_receive (mqd_t mqdes, char *msg_ptr, size_t msg_len, 
                    unsigned int *msg_prio);

mq_receive receives a message from the queue referred by the descriptor mqdes. The oldest of the highest priority is deleted from the queue and passed to the process in the buffer pointed by msg_ptr. msg_len is the length of buffer in bytes and it must be greater than the maximum message size, the mq_msgsize attribute, for the queue. If the pointer msg_prio is not null, the priority of the received message is stored in the integer pointed by it. The default behavior of mq_receive is to block if there is no message in the queue. However, if the O_NONBLOCK flag is enabled for the queue, and the queue is empty, mq_receive returns immediately with errno set to EAGAIN. On success, mq_receive returns the number of bytes received in the buffer pointed by msg_ptr.

#include <time.h>
#include <mqueue.h>

ssize_t mq_timedreceive (mqd_t mqdes, char *msg_ptr, size_t msg_len, 
                         unsigned int *msg_prio, 
                         const struct timespec *abs_timeout);

mq_timedreceive is identical to mq_receive, except that it has an additional parameter indicating a time out. In case O_NONBLOCK flag for the queue is not enabled and the queue is empty, then mq_timedreceive would return at the occurrence of time pointed by abs_timeout. As mentioned above, the time pointed by abs_timeout is an absolute time specified in number of seconds and nanoseconds since the Epoch, January 1, 1970, 00:00:00 +0000 UTC.

2.3 mq_notify

#include <mqueue.h>

int mq_notify (mqd_t mqdes, const struct sigevent *sevp);

mq_notify is for registering or unregistering for asynchronous notification of arrival of a message on an empty queue referred by mqdes.

2.4 mq_unlink

#include <mqueue.h>

int mq_unlink(const char *queue_name);

mq_unlink removes the queue with the name queue_name.

2.5 mq_getattr, mq_setattr

#include <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,
                    struct mq_attr *oldattr);

The mq_getattr function gets the attribute structure, struct mq_attr, for the message queue for descriptor, mqdes. Similarly, the function, mq_setattr is for setting the attributes of a queue. However, the only attribute that can be modified using mq_setattr is the O_NONBLOCK flag in mq_flags. Other fields in the structure pointed by newattr are ignored. If oldattr is not null, the previous values of queue attributes are returned in the structure pointed by it.

3.0 An Example: Client Server Communication using POSIX message queues in Linux

The example below demonstrates interprocess communication between a server and clients using POSIX message queues in Linux. The server manages token numbers, which could be seat numbers for a flight, or something similar. It is server’s job to give a token number to a client on request. In a typical scenario, there might be multiple clients requesting the server for token numbers. The server’s message queue name is known to clients. Each client has its own message queue, in which server posts responses. When a client sends a request, it sends its message queue name. The server opens client’s message queue and sends its response. The client picks up the response from its message queue and reads the token number in it. The process architecture looks like this.

Interprocess communication between client and server using message queues

3.1 Server

The server code is,

/*
 * server.c: Server program
 *           to demonstrate interprocess commnuication
 *           with POSIX message queues
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

#define SERVER_QUEUE_NAME   "/sp-example-server"
#define QUEUE_PERMISSIONS 0660
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 256
#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10

int main (int argc, char **argv)
{
    mqd_t qd_server, qd_client;   // queue descriptors
    long token_number = 1; // next token to be given to client

    printf ("Server: Hello, World!\n");

    struct mq_attr attr;

    attr.mq_flags = 0;
    attr.mq_maxmsg = MAX_MESSAGES;
    attr.mq_msgsize = MAX_MSG_SIZE;
    attr.mq_curmsgs = 0;

    if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) {
        perror ("Server: mq_open (server)");
        exit (1);
    }
    char in_buffer [MSG_BUFFER_SIZE];
    char out_buffer [MSG_BUFFER_SIZE];

    while (1) {
        // get the oldest message with highest priority
        if (mq_receive (qd_server, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) {
            perror ("Server: mq_receive");
            exit (1);
        }

        printf ("Server: message received.\n");

        // send reply message to client

        if ((qd_client = mq_open (in_buffer, O_WRONLY)) == 1) {
            perror ("Server: Not able to open client queue");
            continue;
        }

        sprintf (out_buffer, "%ld", token_number);

        if (mq_send (qd_client, out_buffer, strlen (out_buffer) + 1, 0) == -1) {
            perror ("Server: Not able to send message to client");
            continue;
        }

        printf ("Server: response sent to client.\n");
        token_number++;
    }
}

3.2 Client

The client code is,

/*
 * client.c: Client program
 *           to demonstrate interprocess communication
 *           with POSIX message queues
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

#define SERVER_QUEUE_NAME   "/sp-example-server"
#define QUEUE_PERMISSIONS 0660
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 256
#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10

int main (int argc, char **argv)
{
    char client_queue_name [64];
    mqd_t qd_server, qd_client;   // queue descriptors


    // create the client queue for receiving messages from server
    sprintf (client_queue_name, "/sp-example-client-%d", getpid ());

    struct mq_attr attr;

    attr.mq_flags = 0;
    attr.mq_maxmsg = MAX_MESSAGES;
    attr.mq_msgsize = MAX_MSG_SIZE;
    attr.mq_curmsgs = 0;

    if ((qd_client = mq_open (client_queue_name, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) {
        perror ("Client: mq_open (client)");
        exit (1);
    }

    if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_WRONLY)) == -1) {
        perror ("Client: mq_open (server)");
        exit (1);
    }

    char in_buffer [MSG_BUFFER_SIZE];

    printf ("Ask for a token (Press <ENTER>): ");

    char temp_buf [10];

    while (fgets (temp_buf, 2, stdin)) {

        // send message to server
        if (mq_send (qd_server, client_queue_name, strlen (client_queue_name) + 1, 0) == -1) {
            perror ("Client: Not able to send message to server");
            continue;
        }

        // receive response from server

        if (mq_receive (qd_client, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) {
            perror ("Client: mq_receive");
            exit (1);
        }
        // display token received from server
        printf ("Client: Token received from server: %s\n\n", in_buffer);

        printf ("Ask for a token (Press ): ");
    }


    if (mq_close (qd_client) == -1) {
        perror ("Client: mq_close");
        exit (1);
    }

    if (mq_unlink (client_queue_name) == -1) {
        perror ("Client: mq_unlink");
        exit (1);
    }
    printf ("Client: bye\n");

    exit (0);
}

3.3 Running the server and clients

The server and client programs need to be compiled with the -lrt option. First, the server is run. Then one or more clients can be run for testing. For each queue, a file is created in the /dev/mqueue directory (in Linux).

$ # server
$ gcc server.c -o server -lrt
$ gcc client.c -o client -lrt
$ ./server
Server: Hello, World!
Server: message received.
Server: response sent to client.
Server: message received.
Server: response sent to client.
Server: message received.
Server: response sent to client.
Server: message received.
Server: response sent to client.
...
...

Running the client,

$ ./client
Ask for a token (Press ): 
Client: Token received from server: 1

Ask for a token (Press ): 
Client: Token received from server: 2

Ask for a token (Press ): 
Client: Token received from server: 4

Ask for a token (Press ): Client: bye
$ ls -ls /dev/mqueue
total 0
0 -rw-r----- 1 user1 user1 80 Feb  8 14:54 sp-example-client-19393
0 -rw-r----- 1 user1 user1 80 Feb  8 14:54 sp-example-client-19419
0 -rw-r----- 1 user1 user1 80 Feb  8 14:54 sp-example-server

Server and two clients communicating with POSIX message queues in Linux

Figure: Screenshot showing server and two clients run from terminals in Linux. The fourth terminal shows the files for server and client queues in the /dev/mqueue directory.

4.0 See also

  1. Interprocess communication using System V message queues in Linux

Karunesh Johri

Software developer, working with C and Linux.
4.4 7 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Newest
Oldest Most Voted
Inline Feedbacks
View all comments