MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言消息队列发送与接收

2021-11-231.8k 阅读

消息队列基础概念

什么是消息队列

在Linux环境下,消息队列是一种进程间通信(IPC,Inter - Process Communication)的机制。它允许不同的进程通过发送和接收消息来进行数据交换。消息队列就像是一个存放消息的容器,进程可以向这个容器中添加消息,也可以从中取出消息。与管道等其他IPC机制不同,消息队列中的消息是有类型的,接收方可以根据消息类型有选择地接收消息,而不是像管道那样只能按顺序接收数据。

消息队列的特点

  1. 有类型:消息队列中的每个消息都带有一个类型字段,这使得接收进程能够根据自身需求,只接收特定类型的消息。例如,在一个多模块协作的系统中,不同模块可能发送不同类型的消息,接收模块可以根据类型来处理对应的消息,而忽略其他不关心的消息。
  2. 异步通信:发送进程和接收进程不需要同时运行。发送进程可以随时将消息发送到消息队列中,即使接收进程暂时没有运行,消息也会在队列中等待。当接收进程启动并准备好接收消息时,它可以从队列中取出消息进行处理。这种异步特性提高了进程间通信的灵活性和系统的整体效率。
  3. 可靠性:消息队列在一定程度上保证了消息的可靠性。一旦消息被成功发送到队列中,除非被接收进程取出或队列被删除,否则消息会一直存在。这与一些基于网络的通信方式不同,网络通信可能会因为网络故障等原因导致消息丢失。

消息队列在系统中的位置与作用

消息队列作为Linux内核提供的一种IPC机制,处于用户进程和内核之间。用户进程通过系统调用与内核进行交互,实现消息的发送和接收。在实际应用中,消息队列常用于分布式系统、多模块协作的软件架构等场景。例如,在一个监控系统中,各个监控节点可以将采集到的数据以消息的形式发送到消息队列,而数据处理模块则从消息队列中取出这些消息进行分析和存储。

消息队列相关系统调用

msgget函数

  1. 函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);
  1. 参数说明
    • key:是一个键值,它是消息队列的标识符。通常可以使用ftok函数生成一个唯一的键值。如果keyIPC_PRIVATE,则会创建一个私有的消息队列,只有创建该队列的进程及其子进程可以访问。
    • msgflg:是一组标志位,用于指定消息队列的创建和访问权限。例如,IPC_CREAT表示如果消息队列不存在则创建它;IPC_EXCLIPC_CREAT一起使用时,如果消息队列已经存在,则msgget函数会失败返回。常见的权限位还包括S_IRUSR(用户读权限)、S_IWUSR(用户写权限)等,它们类似于文件的权限设置。
  2. 返回值 成功时,msgget返回一个非负整数,即消息队列的标识符(msgid),后续的消息队列操作(如发送和接收消息)将使用这个标识符。失败时,返回 -1,并设置errno以指示错误原因,常见的错误有EEXIST(消息队列已存在且使用了IPC_EXCL | IPC_CREAT)、ENOENT(消息队列不存在且未使用IPC_CREAT)等。

msgsnd函数

  1. 函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  1. 参数说明
    • msqid:是由msgget函数返回的消息队列标识符。
    • msgp:是一个指向消息结构体的指针。消息结构体必须以一个长整型成员开始,用于指定消息类型,后面可以跟着其他需要发送的数据。例如:
struct mymsgbuf {
    long mtype;
    char mtext[256];
};
  • msgsz:指定要发送的消息正文部分(不包括消息类型字段mtype)的长度,以字节为单位。
  • msgflg:标志位。如果设置为IPC_NOWAIT,当消息队列满时,msgsnd函数不会阻塞,而是立即返回 -1,同时errno设置为EAGAIN。如果不设置该标志,当消息队列满时,msgsnd函数会阻塞,直到有足够的空间或者队列被删除。
  1. 返回值 成功时,msgsnd返回0。失败时,返回 -1,并设置errno,常见错误有EAGAIN(消息队列满且设置了IPC_NOWAIT)、EINVAL(无效的消息队列标识符或消息大小超过限制)等。

msgrcv函数

  1. 函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  1. 参数说明
    • msqid:同样是消息队列标识符。
    • msgp:指向一个用于存储接收消息的结构体指针,其结构应与发送端的消息结构体一致。
    • msgsz:指定接收缓冲区的大小(不包括消息类型字段mtype)。
    • msgtyp:指定要接收的消息类型。如果msgtyp为0,则接收队列中的第一条消息;如果msgtyp大于0,则接收类型等于msgtyp的第一条消息;如果msgtyp小于0,则接收类型小于或等于msgtyp绝对值的第一条消息中类型最小的消息。
    • msgflg:标志位。IPC_NOWAIT的作用与msgsnd中的类似,如果设置该标志,当没有符合条件的消息时,msgrcv函数不会阻塞,而是立即返回 -1,errno设置为ENOMSGMSG_NOERROR标志如果设置,当接收到的消息正文长度大于msgsz时,消息正文将被截断到msgsz字节,而不会返回错误。
  2. 返回值 成功时,msgrcv返回接收到的消息正文部分(不包括mtype)的长度,以字节为单位。失败时,返回 -1,并设置errno,常见错误有ENOMSG(没有符合条件的消息且设置了IPC_NOWAIT)、EINVAL(无效的消息队列标识符等)。

msgctl函数

  1. 函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  1. 参数说明
    • msqid:消息队列标识符。
    • cmd:指定要执行的操作。常见的操作有IPC_STAT(获取消息队列的状态信息,将信息存储在buf指向的结构体中)、IPC_SET(设置消息队列的状态信息,信息来源于buf指向的结构体)、IPC_RMID(删除消息队列)。
    • buf:是一个指向msqid_ds结构体的指针,该结构体用于存储或设置消息队列的状态信息。在IPC_STAT操作中,内核会填充该结构体;在IPC_SET操作中,进程会填充该结构体并传递给内核。
  2. 返回值 成功时,msgctl返回0。失败时,返回 -1,并设置errno,常见错误有EINVAL(无效的消息队列标识符或无效的操作命令)等。

示例代码分析

发送端代码示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MAX_TEXT 512

struct mymsgbuf {
    long mtype;
    char mtext[MAX_TEXT];
};

int main() {
    int msqid;
    key_t key;
    struct mymsgbuf sbuf;
    size_t buf_length;

    // 使用ftok函数生成一个唯一的键值
    if ((key = ftok(".", 'a')) == -1) {
        perror("ftok");
        exit(1);
    }

    // 创建消息队列
    if ((msqid = msgget(key, 0644 | IPC_CREAT)) == -1) {
        perror("msgget");
        exit(1);
    }

    sbuf.mtype = 1;
    printf("Enter a message to send: ");
    fgets(sbuf.mtext, MAX_TEXT, stdin);
    // 去除fgets读取到的换行符
    sbuf.mtext[strcspn(sbuf.mtext, "\n")] = '\0';

    buf_length = strlen(sbuf.mtext) + 1;

    // 发送消息
    if (msgsnd(msqid, &sbuf, buf_length, 0) == -1) {
        perror("msgsnd");
        exit(1);
    }

    printf("Message sent successfully.\n");

    return 0;
}
  1. 代码解析
    • 首先,定义了一个消息结构体mymsgbuf,它包含一个mtype字段用于指定消息类型,以及一个mtext数组用于存储消息正文。
    • 使用ftok函数根据当前目录(.)和字符'a'生成一个唯一的键值keyftok函数通过将路径名和项目ID相结合,生成一个系统范围内唯一的键值,这个键值将用于创建消息队列。
    • 调用msgget函数创建消息队列,权限设置为0644(用户可读可写,组和其他用户可读),并使用IPC_CREAT标志表示如果队列不存在则创建。
    • 初始化消息结构体sbufmtype为1,从标准输入读取用户输入的消息到sbuf.mtext,并使用strcspn函数去除fgets读取到的换行符。
    • 计算消息正文的长度buf_length,并调用msgsnd函数将消息发送到消息队列。如果发送失败,通过perror输出错误信息并退出程序。

接收端代码示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MAX_TEXT 512

struct mymsgbuf {
    long mtype;
    char mtext[MAX_TEXT];
};

int main() {
    int msqid;
    key_t key;
    struct mymsgbuf rbuf;
    size_t buf_length;

    // 使用ftok函数生成与发送端相同的键值
    if ((key = ftok(".", 'a')) == -1) {
        perror("ftok");
        exit(1);
    }

    // 获取消息队列
    if ((msqid = msgget(key, 0644)) == -1) {
        perror("msgget");
        exit(1);
    }

    buf_length = MAX_TEXT;

    // 接收消息
    if (msgrcv(msqid, &rbuf, buf_length, 1, 0) == -1) {
        perror("msgrcv");
        exit(1);
    }

    printf("Received message: %s\n", rbuf.mtext);

    // 删除消息队列
    if (msgctl(msqid, IPC_RMID, NULL) == -1) {
        perror("msgctl");
        exit(1);
    }

    return 0;
}
  1. 代码解析
    • 同样定义了与发送端相同的消息结构体mymsgbuf
    • 使用与发送端相同的方式调用ftok函数生成键值key,以确保获取到与发送端相同的消息队列。
    • 调用msgget函数获取消息队列,权限设置为0644。这里没有使用IPC_CREAT标志,因为假设发送端已经创建了队列。
    • 调用msgrcv函数接收类型为1的消息。如果接收失败,通过perror输出错误信息并退出程序。
    • 成功接收消息后,打印出接收到的消息正文。
    • 最后,调用msgctl函数并使用IPC_RMID命令删除消息队列,以清理系统资源。如果删除失败,同样输出错误信息并退出。

消息队列使用中的注意事项

消息队列大小限制

  1. 系统限制 Linux系统对消息队列的大小有一定限制。这些限制包括单个消息的最大长度、消息队列中所有消息的总长度等。可以通过查看系统文档或使用sysctl命令来获取具体的限制值。例如,在一些系统中,可以通过sysctl -a | grep msg来查看与消息队列相关的参数,如kernel.msgmnb表示消息队列的最大字节数,kernel.msgmax表示单个消息的最大字节数。
  2. 影响及处理 如果发送的消息大小超过了系统限制,msgsnd函数会返回错误(通常是EINVAL)。在编写程序时,需要根据系统限制来合理设计消息结构体的大小。如果确实需要发送较大的数据,可以考虑将数据进行分块处理,拆分成多个符合限制的消息进行发送,接收端再进行组装。

消息队列的持久化与清理

  1. 持久化 消息队列在系统重启后默认会被删除,除非在创建时使用了特殊的机制(如使用共享内存来模拟持久化的消息队列)。这意味着如果希望消息队列在系统重启后仍然可用,需要自行实现相关的持久化逻辑,例如将消息队列的状态和内容存储到文件中,在系统重启后重新加载。
  2. 清理 及时清理不再使用的消息队列是良好的编程习惯。未清理的消息队列会占用系统资源,可能导致系统性能下降。在程序结束时,应该像接收端示例代码那样,调用msgctl函数并使用IPC_RMID命令删除消息队列。此外,如果一个进程异常终止而没有清理其创建的消息队列,可以使用ipcrm命令手动删除(例如ipcrm -q msqid,其中msqid是消息队列的标识符)。

并发访问问题

  1. 竞争条件 当多个进程同时访问消息队列时,可能会出现竞争条件。例如,一个进程正在向消息队列中发送消息,而另一个进程同时尝试删除该消息队列,这可能导致未定义的行为。为了避免竞争条件,可以使用信号量等同步机制。信号量可以用来控制对消息队列的访问,确保在同一时间只有一个进程能够对消息队列进行特定的操作(如发送、接收或删除)。
  2. 死锁情况 在多进程环境下,如果进程之间对消息队列的使用不当,还可能出现死锁。例如,进程A等待从消息队列中接收特定类型的消息,而进程B等待向该消息队列发送消息,同时进程B又在等待进程A完成某个操作后才发送消息,这就形成了死锁。为了避免死锁,需要合理设计进程间的通信逻辑,确保进程不会相互等待形成循环依赖。

错误处理

  1. 系统调用错误 在使用消息队列相关的系统调用(如msggetmsgsndmsgrcvmsgctl)时,要充分处理可能出现的错误。每个系统调用失败时都会设置errno变量,通过perror函数可以输出与errno对应的错误信息,帮助定位问题。例如,如果msgget返回 -1,通过perror("msgget")可以输出类似于“msgget: No such file or directory”(如果是因为指定的键值对应的文件或目录不存在导致的错误)的信息,从而明确错误原因。
  2. 逻辑错误 除了系统调用本身的错误,还可能存在逻辑上的错误。例如,接收端期望接收特定类型的消息,但发送端发送的消息类型与接收端期望的不符,这可能导致接收端无法正确接收到消息。在编写代码时,要仔细检查消息类型的设置和处理逻辑,确保发送端和接收端在消息类型上保持一致。

消息队列与其他IPC机制的比较

与管道的比较

  1. 数据格式
    • 管道:管道只能传输无格式的字节流数据。发送端将数据写入管道,接收端按顺序读取,数据没有类型之分。例如,在使用匿名管道进行父子进程通信时,父进程写入管道的字节序列,子进程只能按顺序读取,无法根据数据的类型进行有选择的读取。
    • 消息队列:消息队列中的消息是有类型的。如前面的示例代码,消息结构体中包含mtype字段,接收端可以根据mtype的值有选择地接收消息,这使得消息队列在数据处理上更加灵活,适用于需要根据不同类型数据进行不同处理的场景。
  2. 同步方式
    • 管道:管道通常是半双工的(匿名管道)或全双工的(命名管道),但数据的传输是同步的。即发送端写入数据后,必须等待接收端读取数据,否则管道可能会被填满导致发送端阻塞。例如,在匿名管道中,如果接收端没有及时读取数据,发送端调用write函数写入数据时,当管道满时会阻塞,直到接收端读取数据腾出空间。
    • 消息队列:消息队列是异步的。发送端将消息发送到队列后,不需要等待接收端立即接收,消息会在队列中等待。接收端可以根据自身的状态和需求,在合适的时间从队列中取出消息,这提高了进程间通信的灵活性,特别适用于发送端和接收端处理速度不一致的场景。
  3. 生命周期
    • 管道:匿名管道的生命周期与创建它的进程及其子进程相关,当这些进程结束时,管道自动销毁。命名管道在文件系统中有对应的节点,其生命周期独立于进程,但如果对应的文件节点被删除,管道也就不能再使用。
    • 消息队列:消息队列的生命周期独立于进程,除非显式地调用msgctl函数并使用IPC_RMID命令删除,否则消息队列会一直存在于系统中,即使创建它的进程已经结束。

与共享内存的比较

  1. 数据访问方式
    • 共享内存:共享内存允许多个进程直接访问同一块内存区域,数据的读写操作直接在内存中进行,因此数据传输速度非常快。但是,由于多个进程可以同时访问共享内存,需要使用同步机制(如信号量)来避免数据竞争和不一致问题。例如,在一个多进程的图形渲染系统中,多个进程可能共享一块内存来存储图形数据,每个进程在访问该内存时需要先获取信号量,以确保数据的一致性。
    • 消息队列:消息队列通过发送和接收消息的方式进行数据传递,数据的访问是间接的。发送端将消息发送到队列,接收端从队列中取出消息,不需要像共享内存那样担心多个进程同时访问同一块内存导致的数据竞争问题,但数据传输速度相对共享内存较慢,因为涉及到内核空间和用户空间的数据拷贝。
  2. 数据组织形式
    • 共享内存:共享内存中的数据没有特定的结构限制,进程可以根据自身需求在共享内存区域中定义和组织数据结构。例如,可以在共享内存中创建一个复杂的链表或树结构来存储数据。
    • 消息队列:消息队列中的消息有特定的结构,必须以一个长整型的消息类型字段开始,后面跟着消息正文。这种结构相对固定,在数据组织上没有共享内存那么灵活,但对于简单的消息传递场景更加适用。
  3. 使用场景
    • 共享内存:适用于对数据传输速度要求极高、需要频繁进行大量数据交换的场景,如实时数据处理、图形图像处理等。
    • 消息队列:适用于对数据传输的可靠性要求较高、数据量相对较小且需要根据消息类型进行有选择处理的场景,如分布式系统中的任务调度、监控系统中的数据采集与处理等。

与信号的比较

  1. 消息内容
    • 信号:信号是一种简单的通知机制,它本身携带的信息非常有限,通常只是一个信号编号,用于通知进程发生了某种特定的事件,如SIGINT(键盘中断信号)、SIGTERM(终止信号)等。进程接收到信号后,根据信号编号执行相应的信号处理函数。
    • 消息队列:消息队列可以传输复杂的结构化数据,消息结构体中可以包含各种类型的数据字段,如字符串、整数、结构体等。例如,在一个任务调度系统中,可以通过消息队列向工作进程发送包含任务详细信息(如任务ID、任务参数等)的消息。
  2. 通信方式
    • 信号:信号是异步的,它是一种事件驱动的通信方式。当某个事件发生(如用户按下Ctrl + C产生SIGINT信号)时,内核会向相应的进程发送信号,进程在接收到信号时会暂停当前的执行流程,转而执行信号处理函数。
    • 消息队列:虽然消息队列也是异步通信机制,但它更侧重于数据的传递。进程通过发送和接收消息来交换数据,接收端可以根据消息类型有选择地接收消息,并且消息会在队列中排队等待处理,而信号更像是一种简单的通知,不适合用于大量数据的传递。
  3. 应用场景
    • 信号:主要用于处理系统事件、进程间的简单通知和异常处理等场景。例如,当用户希望终止一个进程时,可以发送SIGTERM信号;当进程发生错误需要进行清理操作时,可以发送特定的信号并在信号处理函数中进行清理。
    • 消息队列:广泛应用于进程间的数据交换和协作场景,如分布式系统中的组件通信、多模块软件中的任务分配等,这些场景需要传递相对复杂的数据,消息队列的特性更能满足需求。