Linux下的多线程与多进程编程实践
多线程编程基础
在Linux环境下进行多线程编程,我们主要使用POSIX线程库(Pthreads)。Pthreads提供了一系列函数来创建、管理和同步线程。
线程的创建与终止
- 线程创建
在Pthreads中,使用
pthread_create
函数来创建一个新线程。其函数原型如下:
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
thread
:指向新线程标识符的指针。attr
:用于设置线程属性,通常可设为NULL
,表示使用默认属性。start_routine
:新线程开始执行的函数指针。arg
:传递给start_routine
函数的参数。
下面是一个简单的示例代码:
#include <stdio.h>
#include <pthread.h>
void* thread_function(void* arg) {
printf("This is a new thread, argument received: %d\n", *(int*)arg);
pthread_exit(NULL);
}
int main() {
pthread_t my_thread;
int arg = 42;
if (pthread_create(&my_thread, NULL, thread_function, &arg) != 0) {
perror("Failed to create thread");
return 1;
}
printf("Main thread is waiting for the new thread to finish...\n");
if (pthread_join(my_thread, NULL) != 0) {
perror("Failed to join thread");
return 2;
}
printf("New thread has finished, main thread is exiting.\n");
return 0;
}
在这个示例中,我们创建了一个新线程,并向其传递了一个整数参数。主线程等待新线程完成后再退出。
- 线程终止
线程可以通过以下几种方式终止:
- 从启动例程返回:如上述示例,线程函数
thread_function
执行完毕后通过pthread_exit
返回。 - 调用
pthread_exit
:线程可以在任何时候调用pthread_exit
函数来主动终止自身。其原型为:
- 从启动例程返回:如上述示例,线程函数
#include <pthread.h>
void pthread_exit(void *retval);
retval
为线程的返回值,可通过pthread_join
获取。
- 被其他线程取消:使用pthread_cancel
函数可以取消一个线程。其原型为:
#include <pthread.h>
int pthread_cancel(pthread_t thread);
被取消的线程需要设置合适的取消状态和清理处理函数来确保资源的正确释放。
线程同步
多线程编程中,由于多个线程可能同时访问共享资源,因此需要进行同步操作以避免数据竞争和不一致。
- 互斥锁(Mutex)
互斥锁是最基本的同步机制,用于保护共享资源,确保同一时间只有一个线程可以访问。
- 初始化互斥锁:
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
通常attr
设为NULL
,使用默认属性。
- 锁定和解锁互斥锁:
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
下面是一个使用互斥锁的示例:
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int shared_variable = 0;
void* increment_thread(void* arg) {
for (int i = 0; i < 1000000; ++i) {
pthread_mutex_lock(&mutex);
shared_variable++;
pthread_mutex_unlock(&mutex);
}
pthread_exit(NULL);
}
void* decrement_thread(void* arg) {
for (int i = 0; i < 1000000; ++i) {
pthread_mutex_lock(&mutex);
shared_variable--;
pthread_mutex_unlock(&mutex);
}
pthread_exit(NULL);
}
int main() {
pthread_t inc_thread, dec_thread;
if (pthread_create(&inc_thread, NULL, increment_thread, NULL) != 0) {
perror("Failed to create increment thread");
return 1;
}
if (pthread_create(&dec_thread, NULL, decrement_thread, NULL) != 0) {
perror("Failed to create decrement thread");
return 1;
}
if (pthread_join(inc_thread, NULL) != 0) {
perror("Failed to join increment thread");
return 2;
}
if (pthread_join(dec_thread, NULL) != 0) {
perror("Failed to join decrement thread");
return 2;
}
printf("Final value of shared variable: %d\n", shared_variable);
pthread_mutex_destroy(&mutex);
return 0;
}
在这个示例中,两个线程分别对共享变量进行递增和递减操作,通过互斥锁确保每次只有一个线程能访问共享变量。
- 条件变量(Condition Variable)
条件变量用于线程间的同步通信,一个线程可以等待某个条件满足,而其他线程可以通知该条件已满足。
- 初始化条件变量:
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
- **等待条件变量**:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
在调用pthread_cond_wait
前,需要先锁定互斥锁。该函数会自动解锁互斥锁并等待条件变量被唤醒,唤醒后会重新锁定互斥锁。
- 通知条件变量:
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_signal
唤醒一个等待该条件变量的线程,pthread_cond_broadcast
唤醒所有等待该条件变量的线程。
下面是一个简单的生产者 - 消费者模型示例,使用条件变量进行同步:
#include <stdio.h>
#include <pthread.h>
#define BUFFER_SIZE 5
int buffer[BUFFER_SIZE];
int in = 0;
int out = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
void* producer(void* arg) {
int item = 1;
while (1) {
pthread_mutex_lock(&mutex);
while ((in + 1) % BUFFER_SIZE == out) {
pthread_cond_wait(¬_full, &mutex);
}
buffer[in] = item;
printf("Produced: %d at position %d\n", item, in);
in = (in + 1) % BUFFER_SIZE;
pthread_cond_signal(¬_empty);
pthread_mutex_unlock(&mutex);
item++;
}
pthread_exit(NULL);
}
void* consumer(void* arg) {
while (1) {
pthread_mutex_lock(&mutex);
while (in == out) {
pthread_cond_wait(¬_empty, &mutex);
}
int item = buffer[out];
printf("Consumed: %d from position %d\n", item, out);
out = (out + 1) % BUFFER_SIZE;
pthread_cond_signal(¬_full);
pthread_mutex_unlock(&mutex);
}
pthread_exit(NULL);
}
int main() {
pthread_t producer_thread, consumer_thread;
if (pthread_create(&producer_thread, NULL, producer, NULL) != 0) {
perror("Failed to create producer thread");
return 1;
}
if (pthread_create(&consumer_thread, NULL, consumer, NULL) != 0) {
perror("Failed to create consumer thread");
return 1;
}
if (pthread_join(producer_thread, NULL) != 0) {
perror("Failed to join producer thread");
return 2;
}
if (pthread_join(consumer_thread, NULL) != 0) {
perror("Failed to join consumer thread");
return 2;
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(¬_full);
pthread_cond_destroy(¬_empty);
return 0;
}
在这个示例中,生产者线程向缓冲区中生产数据,消费者线程从缓冲区中消费数据。当缓冲区满时,生产者等待;当缓冲区空时,消费者等待。
多进程编程基础
在Linux下,多进程编程主要通过fork
系统调用实现。每个进程都有自己独立的地址空间和资源。
进程的创建与终止
- 进程创建
fork
函数用于创建一个新进程,新进程是原进程的副本。其函数原型为:
#include <unistd.h>
pid_t fork(void);
fork
调用一次,返回两次。在父进程中返回子进程的PID,在子进程中返回0。如果出错,返回 -1。
下面是一个简单的fork
示例:
#include <stdio.h>
#include <unistd.h>
int main() {
pid_t pid = fork();
if (pid == -1) {
perror("fork failed");
return 1;
} else if (pid == 0) {
printf("This is the child process, PID: %d\n", getpid());
} else {
printf("This is the parent process, PID: %d, Child PID: %d\n", getpid(), pid);
}
return 0;
}
在这个示例中,父进程创建一个子进程,然后父子进程分别输出自己的PID。
- 进程终止
进程可以通过以下几种方式终止:
- 调用
exit
函数:这是正常终止进程的方式。exit
函数会清理所有打开的文件描述符等资源,并向父进程返回一个状态码。其原型为:
- 调用
#include <stdlib.h>
void exit(int status);
- **从`main`函数返回**:这与调用`exit`效果相同,返回值会作为状态码传递给父进程。
- **调用`_exit`或`_Exit`**:这两个函数直接终止进程,不会执行标准I/O库的清理操作。`_exit`的原型为:
#include <unistd.h>
void _exit(int status);
进程间通信(IPC)
多个进程之间需要进行通信以协同工作。常见的IPC机制有管道、消息队列、共享内存和信号量等。
- 管道(Pipe)
管道是一种半双工的通信机制,数据只能单向流动。分为匿名管道和命名管道。
- 匿名管道:使用
pipe
函数创建,只能用于具有亲缘关系(父子进程等)的进程之间通信。其函数原型为:
- 匿名管道:使用
#include <unistd.h>
int pipe(int pipefd[2]);
pipefd[0]
用于读,pipefd[1]
用于写。
下面是一个父子进程通过匿名管道通信的示例:
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#define BUFFER_SIZE 1024
int main() {
int pipefd[2];
if (pipe(pipefd) == -1) {
perror("pipe");
return 1;
}
pid_t pid = fork();
if (pid == -1) {
perror("fork");
return 1;
} else if (pid == 0) {
close(pipefd[0]); // 子进程关闭读端
const char* message = "Hello from child!";
write(pipefd[1], message, strlen(message) + 1);
close(pipefd[1]);
} else {
close(pipefd[1]); // 父进程关闭写端
char buffer[BUFFER_SIZE];
ssize_t bytes_read = read(pipefd[0], buffer, sizeof(buffer));
if (bytes_read > 0) {
buffer[bytes_read - 1] = '\0'; // 确保字符串以\0结尾
printf("Parent received: %s\n", buffer);
}
close(pipefd[0]);
}
return 0;
}
在这个示例中,子进程向管道写入数据,父进程从管道读取数据。
- **命名管道(FIFO)**:命名管道可以在不相关的进程之间通信。通过`mkfifo`函数创建,其原型为:
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
下面是一个使用命名管道进行两个不相关进程通信的示例: 发送端(sender.c):
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#define FIFO_PATH "/tmp/my_fifo"
#define BUFFER_SIZE 1024
int main() {
int fd = open(FIFO_PATH, O_WRONLY);
if (fd == -1) {
perror("open");
return 1;
}
const char* message = "Hello from sender!";
write(fd, message, strlen(message) + 1);
close(fd);
return 0;
}
接收端(receiver.c):
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>
#define FIFO_PATH "/tmp/my_fifo"
#define BUFFER_SIZE 1024
int main() {
if (mkfifo(FIFO_PATH, 0666) == -1 && errno != EEXIST) {
perror("mkfifo");
return 1;
}
int fd = open(FIFO_PATH, O_RDONLY);
if (fd == -1) {
perror("open");
return 1;
}
char buffer[BUFFER_SIZE];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
if (bytes_read > 0) {
buffer[bytes_read - 1] = '\0';
printf("Receiver received: %s\n", buffer);
}
close(fd);
unlink(FIFO_PATH);
return 0;
}
在这个示例中,发送端进程向命名管道写入数据,接收端进程从命名管道读取数据。
- 消息队列
消息队列是一种有格式的、按队列顺序进行数据传递的IPC机制。通过
msgget
、msgsnd
和msgrcv
等函数实现。- 创建或获取消息队列:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
key
是消息队列的标识符,msgflg
用于指定创建标志等。
- 发送消息:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msgp
是指向消息结构体的指针,msgsz
是消息正文的长度。
- 接收消息:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msgtyp
指定接收消息的类型。
下面是一个简单的消息队列示例:
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#define MSG_SIZE 1024
#define MSG_TYPE 1
typedef struct {
long mtype;
char mtext[MSG_SIZE];
} message_t;
int main() {
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}
int msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
message_t msg;
msg.mtype = MSG_TYPE;
strcpy(msg.mtext, "Hello from message queue!");
if (msgsnd(msqid, &msg, strlen(msg.mtext) + 1, 0) == -1) {
perror("msgsnd");
return 1;
}
printf("Message sent.\n");
if (msgrcv(msqid, &msg, MSG_SIZE, MSG_TYPE, 0) == -1) {
perror("msgrcv");
return 1;
}
printf("Message received: %s\n", msg.mtext);
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
return 1;
}
return 0;
}
在这个示例中,我们创建一个消息队列,发送一条消息,然后再接收这条消息,并最终删除消息队列。
- 共享内存
共享内存允许不同进程访问同一块物理内存区域,是最快的IPC机制。通过
shmget
、shmat
和shmdt
等函数实现。- 创建或获取共享内存段:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
size
是共享内存段的大小。
- 附加共享内存段到进程地址空间:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
void *shmat(int shmid, const void *shmaddr, int shmflg);
- **分离共享内存段**:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
int shmdt(const void *shmaddr);
下面是一个简单的共享内存示例:
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>
#define SHM_SIZE 1024
int main() {
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok");
return 1;
}
int shmid = shmget(key, SHM_SIZE, IPC_CREAT | 0666);
if (shmid == -1) {
perror("shmget");
return 1;
}
char *shared_memory = (char*)shmat(shmid, NULL, 0);
if (shared_memory == (void*)-1) {
perror("shmat");
return 1;
}
strcpy(shared_memory, "Hello from shared memory!");
printf("Message written to shared memory.\n");
char buffer[SHM_SIZE];
strcpy(buffer, shared_memory);
if (shmdt(shared_memory) == -1) {
perror("shmdt");
return 1;
}
if (shmctl(shmid, IPC_RMID, NULL) == -1) {
perror("shmctl");
return 1;
}
printf("Message read from shared memory: %s\n", buffer);
return 0;
}
在这个示例中,我们创建一个共享内存段,写入一条消息,然后读取这条消息,并最终删除共享内存段。
- 信号量
信号量用于控制对共享资源的访问数量。通过
semget
、semop
和semctl
等函数实现。- 创建或获取信号量集:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semget(key_t key, int nsems, int semflg);
nsems
是信号量集中信号量的个数。
- 操作信号量:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semop(int semid, struct sembuf *sops, unsigned nsops);
struct sembuf
结构体定义了信号量的操作,如增加或减少信号量的值。
- 控制信号量:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semctl(int semid, int semnum, int cmd, ...);
cmd
指定操作命令,如初始化信号量等。
下面是一个简单的信号量示例,用于控制对共享资源的访问:
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>
#define SEM_KEY 1234
#define SEM_COUNT 1
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
void semaphore_p(int semid) {
struct sembuf sem_op;
sem_op.sem_num = 0;
sem_op.sem_op = -1;
sem_op.sem_flg = 0;
semop(semid, &sem_op, 1);
}
void semaphore_v(int semid) {
struct sembuf sem_op;
sem_op.sem_num = 0;
sem_op.sem_op = 1;
sem_op.sem_flg = 0;
semop(semid, &sem_op, 1);
}
int main() {
key_t key = SEM_KEY;
int semid = semget(key, SEM_COUNT, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
return 1;
}
union semun sem_arg;
sem_arg.val = 1;
if (semctl(semid, 0, SETVAL, sem_arg) == -1) {
perror("semctl");
return 1;
}
pid_t pid = fork();
if (pid == -1) {
perror("fork");
return 1;
} else if (pid == 0) {
semaphore_p(semid);
printf("Child process entered critical section.\n");
sleep(2);
printf("Child process leaving critical section.\n");
semaphore_v(semid);
} else {
semaphore_p(semid);
printf("Parent process entered critical section.\n");
sleep(2);
printf("Parent process leaving critical section.\n");
semaphore_v(semid);
}
if (semctl(semid, 0, IPC_RMID, 0) == -1) {
perror("semctl");
return 1;
}
return 0;
}
在这个示例中,通过信号量控制父子进程对共享资源(这里模拟为临界区)的访问,确保同一时间只有一个进程能进入临界区。
多线程与多进程的比较与选择
-
资源占用
- 多线程:线程共享进程的地址空间和资源,如文件描述符、堆内存等。因此线程创建和销毁的开销相对较小,适合大量并发任务。但是,由于共享资源,线程间需要更复杂的同步机制来避免数据竞争。
- 多进程:每个进程都有自己独立的地址空间和资源,进程间数据相互隔离,安全性较高。但进程创建和销毁的开销较大,因为需要复制整个地址空间和资源。
-
同步机制
- 多线程:常用的同步机制有互斥锁、条件变量、读写锁等。由于线程共享资源,同步操作需要精细控制,否则容易出现死锁等问题。
- 多进程:进程间通信相对复杂,需要使用管道、消息队列、共享内存等IPC机制。同步操作可以通过信号量等方式实现,但相比于线程同步,其实现和维护成本较高。
-
应用场景
- 多线程:适用于I/O密集型任务,如网络服务器中处理多个客户端连接,因为线程间切换开销小,可以快速响应I/O操作。同时,对于一些需要共享大量数据的并发任务,多线程也是较好的选择。
- 多进程:适用于计算密集型任务,因为每个进程可以充分利用多核CPU的优势,且进程间相互隔离,不会因为一个进程的崩溃影响其他进程。对于一些对安全性要求较高,不希望共享资源的场景,多进程更为合适。
在实际应用中,需要根据具体的需求和场景来选择多线程还是多进程编程,有时也会结合使用两者,以达到最佳的性能和资源利用效果。
总结
在Linux环境下,多线程和多进程编程为我们提供了强大的并发处理能力。多线程适合I/O密集型任务和需要共享资源的场景,通过Pthreads库可以方便地实现线程的创建、管理和同步。多进程则更适合计算密集型任务和对安全性要求高的场景,通过fork
和各种IPC机制实现进程间的通信和同步。深入理解和掌握这两种编程模型,能够帮助我们开发出高效、稳定的后端应用程序,充分利用多核CPU和系统资源,提升应用的性能和响应能力。在实际项目中,应根据具体需求和场景,合理选择和运用多线程与多进程技术,以达到最优的开发效果。