Linux C语言消息队列编程入门
1. 消息队列简介
在Linux环境下的进程间通信(IPC, Inter - Process Communication)中,消息队列是一种重要的机制。它允许不同进程之间以消息的形式进行数据传递。与管道(pipe)和FIFO(命名管道)相比,消息队列提供了一种异步、有类型的通信方式。
每个消息队列都有一个唯一的标识符,进程通过这个标识符来访问消息队列。消息在队列中以记录的形式存在,每个记录包含一个消息类型和消息正文。这种结构使得接收进程可以根据消息类型有选择地接收消息,而不是像管道那样只能按照先进先出的顺序接收数据。
2. 消息队列相关系统调用
2.1 msgget函数
msgget函数用于创建一个新的消息队列或获取一个已有的消息队列。其函数原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
key
:是一个键值,它可以通过ftok
函数生成,用于唯一标识一个消息队列。不同进程只要使用相同的key
,就可以访问同一个消息队列。msgflg
:用于指定消息队列的创建标志和访问权限。例如IPC_CREAT
表示如果消息队列不存在则创建它,IPC_EXCL
与IPC_CREAT
一起使用时,如果消息队列已存在则返回错误。权限部分类似文件权限,如0666
表示可读可写。
返回值:成功时返回消息队列的标识符(一个非负整数),失败时返回 - 1。
2.2 msgsnd函数
msgsnd函数用于向消息队列发送消息。其函数原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msqid
:是由msgget
函数返回的消息队列标识符。msgp
:是一个指向消息缓冲区的指针。这个缓冲区必须是一个结构体,其第一个成员必须是long
类型,用于指定消息类型,后面的成员是消息正文。
struct mymsgbuf {
long mtype;
char mtext[256];
};
msgsz
:是消息正文的长度(不包括mtype
)。msgflg
:用于控制发送行为。如果设置了IPC_NOWAIT
,当消息队列满时,函数会立即返回错误,而不是等待队列有空间。
返回值:成功时返回0,失败时返回 - 1。
2.3 msgrcv函数
msgrcv函数用于从消息队列接收消息。其函数原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msqid
:消息队列标识符。msgp
:指向接收消息缓冲区的指针,与msgsnd
中使用的结构体类型相同。msgsz
:指定接收缓冲区的大小(不包括mtype
)。msgtyp
:用于指定要接收的消息类型。如果msgtyp
为0,则接收队列中的第一个消息;如果msgtyp
大于0,则接收类型等于msgtyp
的第一个消息;如果msgtyp
小于0,则接收类型小于或等于msgtyp
绝对值的消息中类型最小的第一个消息。msgflg
:用于控制接收行为。IPC_NOWAIT
标志表示如果没有符合条件的消息,函数立即返回错误,而不是等待。
返回值:成功时返回接收到的消息正文的长度(不包括mtype
),失败时返回 - 1。
2.4 msgctl函数
msgctl函数用于对消息队列进行控制操作,如删除消息队列、获取或设置消息队列的属性等。其函数原型如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid
:消息队列标识符。cmd
:指定要执行的操作。常见的操作有IPC_STAT
(获取消息队列的属性并存储在buf
中)、IPC_SET
(设置消息队列的属性,属性值取自buf
)和IPC_RMID
(删除消息队列)。buf
:指向一个msqid_ds
结构体的指针,用于获取或设置消息队列的属性。
返回值:成功时返回0,失败时返回 - 1。
3. 示例代码 - 简单的消息队列通信
下面是一个简单的示例代码,展示了如何在两个进程(一个发送进程和一个接收进程)之间使用消息队列进行通信。
3.1 发送端代码(sender.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSG_SIZE 256
// 定义消息结构体
struct mymsgbuf {
long mtype;
char mtext[MSG_SIZE];
};
int main() {
key_t key;
int msqid;
struct mymsgbuf msg;
// 生成唯一的键值
key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}
// 获取或创建消息队列
msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
// 设置消息类型和内容
msg.mtype = 1;
strcpy(msg.mtext, "Hello, message queue!");
// 发送消息
if (msgsnd(msqid, &msg, strlen(msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}
printf("Message sent successfully.\n");
// 不需要再使用消息队列,删除它
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl IPC_RMID");
return 1;
}
return 0;
}
3.2 接收端代码(receiver.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSG_SIZE 256
// 定义消息结构体
struct mymsgbuf {
long mtype;
char mtext[MSG_SIZE];
};
int main() {
key_t key;
int msqid;
struct mymsgbuf msg;
ssize_t len;
// 生成唯一的键值,必须与发送端相同
key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}
// 获取消息队列
msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
// 接收消息
len = msgrcv(msqid, &msg, MSG_SIZE, 1, 0);
if (len == -1) {
perror("msgrcv");
return 1;
}
msg.mtext[len] = '\0';
printf("Received message: %s\n", msg.mtext);
return 0;
}
4. 编译和运行示例代码
- 编译发送端代码:在终端中进入存放
sender.c
文件的目录,执行以下命令:
gcc -o sender sender.c
- 编译接收端代码:同样在该目录下,执行以下命令编译
receiver.c
:
gcc -o receiver receiver.c
- 运行程序:
- 先运行接收端程序,在一个终端中执行
./receiver
,它会等待消息。 - 然后在另一个终端中运行发送端程序
./sender
,发送消息。接收端接收到消息后会打印出来。
- 先运行接收端程序,在一个终端中执行
5. 消息队列的属性与限制
每个消息队列都有一些属性,这些属性可以通过msgctl
函数的IPC_STAT
命令获取,通过IPC_SET
命令设置。msqid_ds
结构体定义了这些属性:
struct msqid_ds {
struct ipc_perm msg_perm;
time_t msg_stime;
time_t msg_rtime;
time_t msg_ctime;
unsigned long msg_lcbytes;
unsigned long msg_lqbytes;
long msg_qnum;
long msg_qbytes;
pid_t msg_lspid;
pid_t msg_lrpid;
};
msg_perm
:包含消息队列的权限和所有者信息。msg_stime
:最后一次发送消息的时间。msg_rtime
:最后一次接收消息的时间。msg_ctime
:最后一次改变消息队列状态的时间。msg_qnum
:当前队列中的消息数量。msg_qbytes
:队列允许的最大字节数。
Linux系统对消息队列也有一些限制,例如每个消息队列的最大字节数、每个消息的最大长度等。这些限制可以通过修改内核参数来调整,但一般情况下不建议随意修改,除非有明确的需求。
6. 消息队列的应用场景
- 分布式系统中的异步通信:在分布式系统中,不同节点之间可能需要进行异步的数据传递。消息队列可以作为一种可靠的异步通信机制,解耦不同模块之间的依赖关系。例如,一个订单处理系统中,订单生成模块可以将订单信息发送到消息队列,而订单处理模块可以从消息队列中获取订单并进行处理,这样即使订单处理模块暂时不可用,订单生成模块也不会受到影响。
- 日志记录:应用程序可以将日志信息发送到消息队列,然后由专门的日志处理进程从队列中读取日志并进行存储、分析等操作。这种方式可以提高应用程序的性能,因为应用程序不需要等待日志处理完成,而是将日志发送到队列后继续执行其他任务。
- 任务调度:在一个复杂的系统中,可能有许多任务需要按照一定的顺序或条件执行。消息队列可以用于任务的调度,将任务以消息的形式发送到队列中,调度程序根据消息类型和其他条件从队列中取出任务并执行。
7. 消息队列与其他IPC机制的比较
- 与管道和FIFO的比较:
- 同步性:管道和FIFO是同步通信机制,数据的发送和接收是阻塞的,即发送进程在管道或FIFO满时会等待,接收进程在管道或FIFO空时会等待。而消息队列可以通过设置
IPC_NOWAIT
标志实现异步通信。 - 数据类型:管道和FIFO只能以字节流的形式传递数据,接收方无法根据数据类型有选择地接收。消息队列则可以根据消息类型进行选择性接收。
- 生命周期:管道的生命周期与创建它的进程相同,进程结束管道就消失。FIFO的生命周期独立于进程,但在文件系统中存在。消息队列的生命周期独立于进程,直到被显式删除。
- 同步性:管道和FIFO是同步通信机制,数据的发送和接收是阻塞的,即发送进程在管道或FIFO满时会等待,接收进程在管道或FIFO空时会等待。而消息队列可以通过设置
- 与共享内存的比较:
- 数据拷贝:共享内存是最快的IPC机制,因为它直接在多个进程之间共享内存区域,不需要数据拷贝。而消息队列在发送和接收消息时需要进行数据拷贝。
- 同步控制:共享内存本身不提供同步机制,需要进程自己实现同步,如使用信号量。消息队列则提供了一定的同步机制,例如可以控制消息的发送和接收顺序。
8. 消息队列的错误处理
在使用消息队列的系统调用时,可能会遇到各种错误。常见的错误及其原因如下:
- EACCES:权限不足。例如,进程没有足够的权限访问消息队列,或者尝试以不允许的方式(如写入只读队列)操作消息队列。
- EEXIST:在使用
msgget
函数时,如果指定了IPC_CREAT | IPC_EXCL
标志,且消息队列已经存在,则会返回此错误。 - EIDRM:消息队列已被删除。在尝试对已删除的消息队列进行操作(如发送或接收消息)时会出现此错误。
- EINVAL:参数无效。例如,在
msgsnd
函数中,msgsz
大于系统允许的最大消息长度,或者在msgctl
函数中,指定的cmd
参数无效。 - ENOMEM:系统内存不足,无法满足消息队列操作的需求,如创建新的消息队列或发送大消息。
当系统调用返回错误时,应该使用perror
函数打印错误信息,以便调试和定位问题。例如:
if (msgsnd(msqid, &msg, strlen(msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}
9. 多进程和多线程环境下的消息队列使用
9.1 多进程环境
在多进程环境下使用消息队列时,需要注意进程同步的问题。由于多个进程可能同时访问消息队列,可能会出现竞争条件。例如,两个进程同时尝试向消息队列发送消息,可能会导致数据混乱。
为了解决这个问题,可以使用信号量来进行同步。信号量可以控制对共享资源(如消息队列)的访问,确保同一时间只有一个进程能够对其进行操作。
以下是一个简单的示例,展示了如何使用信号量来同步多进程对消息队列的访问:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/sem.h>
#include <unistd.h>
#define MSG_SIZE 256
#define SEM_KEY 1234
// 定义消息结构体
struct mymsgbuf {
long mtype;
char mtext[MSG_SIZE];
};
// 信号量操作函数
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
void semaphore_p(int semid) {
struct sembuf sem_op;
sem_op.sem_num = 0;
sem_op.sem_op = -1;
sem_op.sem_flg = 0;
semop(semid, &sem_op, 1);
}
void semaphore_v(int semid) {
struct sembuf sem_op;
sem_op.sem_num = 0;
sem_op.sem_op = 1;
sem_op.sem_flg = 0;
semop(semid, &sem_op, 1);
}
int main() {
key_t key;
int msqid, semid;
struct mymsgbuf msg;
// 生成唯一的键值
key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}
// 获取或创建消息队列
msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
// 创建信号量
semid = semget(SEM_KEY, 1, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
return 1;
}
// 初始化信号量
union semun sem_union;
sem_union.val = 1;
if (semctl(semid, 0, SETVAL, sem_union) == -1) {
perror("semctl SETVAL");
return 1;
}
// 子进程发送消息
if (fork() == 0) {
semaphore_p(semid);
msg.mtype = 1;
strcpy(msg.mtext, "Message from child process");
if (msgsnd(msqid, &msg, strlen(msg.mtext), 0) == -1) {
perror("msgsnd in child");
exit(1);
}
semaphore_v(semid);
exit(0);
}
// 父进程发送消息
semaphore_p(semid);
msg.mtype = 1;
strcpy(msg.mtext, "Message from parent process");
if (msgsnd(msqid, &msg, strlen(msg.mtext), 0) == -1) {
perror("msgsnd in parent");
return 1;
}
semaphore_v(semid);
// 等待子进程结束
wait(NULL);
// 删除消息队列和信号量
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl IPC_RMID");
return 1;
}
if (semctl(semid, 0, IPC_RMID) == -1) {
perror("semctl IPC_RMID");
return 1;
}
return 0;
}
9.2 多线程环境
在多线程环境下使用消息队列,同样需要注意同步问题。由于线程共享进程的地址空间,多个线程对消息队列的并发访问也可能导致数据不一致。
可以使用互斥锁(mutex)来保护对消息队列的操作。以下是一个简单的示例:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSG_SIZE 256
// 定义消息结构体
struct mymsgbuf {
long mtype;
char mtext[MSG_SIZE];
};
// 互斥锁
pthread_mutex_t mutex;
// 线程函数
void *thread_function(void *arg) {
int msqid = *((int *)arg);
struct mymsgbuf msg;
pthread_mutex_lock(&mutex);
msg.mtype = 1;
strcpy(msg.mtext, "Message from thread");
if (msgsnd(msqid, &msg, strlen(msg.mtext), 0) == -1) {
perror("msgsnd in thread");
}
pthread_mutex_unlock(&mutex);
return NULL;
}
int main() {
key_t key;
int msqid;
pthread_t thread;
// 生成唯一的键值
key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}
// 获取或创建消息队列
msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
// 初始化互斥锁
if (pthread_mutex_init(&mutex, NULL) != 0) {
perror("pthread_mutex_init");
return 1;
}
// 创建线程
if (pthread_create(&thread, NULL, thread_function, &msqid) != 0) {
perror("pthread_create");
return 1;
}
// 主线程发送消息
struct mymsgbuf msg;
pthread_mutex_lock(&mutex);
msg.mtype = 1;
strcpy(msg.mtext, "Message from main thread");
if (msgsnd(msqid, &msg, strlen(msg.mtext), 0) == -1) {
perror("msgsnd in main");
}
pthread_mutex_unlock(&mutex);
// 等待线程结束
if (pthread_join(thread, NULL) != 0) {
perror("pthread_join");
return 1;
}
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl IPC_RMID");
return 1;
}
// 销毁互斥锁
if (pthread_mutex_destroy(&mutex) != 0) {
perror("pthread_mutex_destroy");
return 1;
}
return 0;
}
通过上述方式,在多进程和多线程环境下可以安全地使用消息队列进行进程间或线程间的通信。