MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

并发编程中的线程安全与可重入性设计

2023-06-271.4k 阅读

并发编程基础概念

在深入探讨线程安全与可重入性之前,我们先来回顾一下并发编程中的一些基础概念。

进程与线程

进程:是计算机中程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位。每个进程都有独立的地址空间、内存、数据栈以及其他记录其运行状态的辅助数据。例如,当我们打开一个浏览器应用,这就是一个进程,它有自己独立的资源来执行各种任务,如加载网页、播放视频等。

线程:是进程中的一个执行单元,是程序执行的最小单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。以浏览器进程为例,其中可能有负责页面渲染的线程、负责网络请求的线程等,它们在同一个进程空间内协同工作。

并发与并行

并发:指在同一时间段内,多个任务交替执行。操作系统通过时间片轮转等调度算法,让 CPU 在多个任务间快速切换,使得在宏观上看起来多个任务好像是同时在执行。比如单核 CPU 上同时运行多个程序,CPU 快速地在各个程序对应的任务间切换,给用户造成多个程序同时运行的错觉。

并行:指在同一时刻,多个任务真正地同时执行。这需要多核 CPU 的支持,每个核可以独立地执行一个任务,实现真正意义上的同时运行。例如,在多核服务器上,不同的核可以同时处理不同的网络请求。

线程安全问题的本质

共享资源与竞争条件

当多个线程共享某些资源(如内存中的变量、文件等)时,如果这些线程对共享资源进行读写操作,就可能引发线程安全问题。竞争条件是指多个线程同时访问和修改共享资源,导致最终结果依赖于线程执行的顺序。

假设我们有一个简单的计数器变量 counter,两个线程都对其进行加 1 操作。在单线程环境下,执行两次加 1 操作后 counter 的值应该为 2。但在多线程环境中,如果两个线程同时读取 counter 的值(假设初始值为 0),然后各自进行加 1 操作并写回,最终 counter 的值可能是 1 而不是 2。这就是因为两个线程竞争对 counter 的访问,产生了竞争条件。

下面是一段简单的 Python 代码示例来演示这种竞争条件:

import threading

counter = 0


def increment():
    global counter
    for _ in range(1000000):
        counter = counter + 1


threads = []
for _ in range(2):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Final counter value:", counter)

在这段代码中,我们期望 counter 最终的值为 2000000,但由于竞争条件,实际运行结果往往小于这个值。

内存可见性问题

除了竞争条件,内存可见性也是线程安全的一个重要问题。现代计算机系统为了提高性能,CPU 会有高速缓存,并且编译器和处理器会对指令进行重排序。这就导致不同线程对共享变量的修改,在其他线程中可能不会立即可见。

例如,线程 A 修改了共享变量 x,但这个修改可能暂时只存在于线程 A 对应的 CPU 高速缓存中,还未写回到主内存。此时线程 B 从主内存读取 x,就会读到旧的值,从而导致数据不一致。

实现线程安全的方法

互斥锁(Mutex)

互斥锁是一种最基本的线程同步机制,它通过保证同一时间只有一个线程能够访问共享资源,从而避免竞争条件。当一个线程获取了互斥锁,其他线程就必须等待,直到该线程释放互斥锁。

在 C++ 中,使用标准库的 std::mutex 来实现互斥锁。以下是修改上述计数器示例的代码:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int counter = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        mtx.lock();
        counter++;
        mtx.unlock();
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final counter value: " << counter << std::endl;
    return 0;
}

在这段代码中,通过 mtx.lock()mtx.unlock() 来保护对 counter 的操作,确保同一时间只有一个线程能修改 counter,从而解决了竞争条件问题。

读写锁(Read - Write Lock)

读写锁区分了读操作和写操作。允许多个线程同时进行读操作,因为读操作不会修改共享资源,不会产生竞争条件。但写操作时,必须独占资源,以防止数据不一致。

在 Java 中,可以使用 ReentrantReadWriteLock 来实现读写锁。以下是一个简单的示例:

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteExample {
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private int data = 0;

    public void read() {
        lock.readLock().lock();
        try {
            System.out.println("Reading data: " + data);
        } finally {
            lock.readLock().unlock();
        }
    }

    public void write(int newData) {
        lock.writeLock().lock();
        try {
            data = newData;
            System.out.println("Writing data: " + data);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

在这个示例中,read 方法使用读锁,多个线程可以同时调用 read 方法进行读操作。而 write 方法使用写锁,当一个线程在执行 write 方法时,其他线程无论是读还是写操作都必须等待。

信号量(Semaphore)

信号量可以控制同时访问共享资源的线程数量。它内部维护一个计数器,当线程获取信号量时,计数器减 1;当线程释放信号量时,计数器加 1。当计数器为 0 时,其他线程无法获取信号量,只能等待。

在 Python 中,threading.Semaphore 可以实现信号量。以下是一个简单的示例,模拟一个停车场,最多允许 3 辆车进入:

import threading
import time

semaphore = threading.Semaphore(3)


def car(car_num):
    semaphore.acquire()
    print(f"Car {car_num} has entered the parking lot.")
    time.sleep(2)
    print(f"Car {car_num} is leaving the parking lot.")
    semaphore.release()


for i in range(5):
    t = threading.Thread(target=car, args=(i,))
    t.start()

在这个示例中,Semaphore(3) 表示最多允许 3 个线程(即 3 辆车)同时进入停车场。当超过 3 辆车尝试进入时,多余的车辆(线程)就需要等待。

可重入性概念

什么是可重入性

可重入性是指一个函数可以被多个线程同时调用,并且在执行过程中被中断后再次进入,不会出现数据错误或其他异常情况。一个可重入函数必须满足以下条件:

  1. 不依赖于静态或全局变量,除非这些变量是线程安全的。
  2. 不持有任何持久化的状态,除了函数参数和局部变量。
  3. 不调用任何不可重入的函数。

例如,一个简单的数学计算函数 add

int add(int a, int b) {
    return a + b;
}

这个函数不依赖于任何外部的静态或全局变量,只依赖于传入的参数,所以它是可重入的。多个线程可以同时调用这个函数,并且不会出现线程安全问题。

可重入函数与线程安全的关系

可重入函数通常是线程安全的,但线程安全的函数不一定是可重入的。线程安全主要关注的是多个线程同时访问共享资源时的数据一致性,而可重入性更强调函数在被中断后再次进入的正确性。

例如,一个函数内部使用了静态变量来保存中间结果:

int calculate(int num) {
    static int result = 0;
    result += num;
    return result;
}

这个函数虽然通过一些同步机制(如互斥锁)可以保证线程安全,但它不是可重入的,因为它依赖于静态变量 result。如果在函数执行过程中被中断,另一个线程调用该函数修改了 result,当原线程继续执行时,结果就会出错。

实现可重入性的方法

避免使用静态和全局变量

如上述 add 函数的示例,通过仅依赖函数参数和局部变量进行计算,避免了对静态和全局变量的依赖,从而保证了可重入性。

使用线程本地存储(Thread - Local Storage,TLS)

线程本地存储允许每个线程拥有自己独立的变量副本。在程序运行时,每个线程访问的变量都是自己的副本,不会与其他线程的副本相互干扰。

在 C++ 中,可以使用 __thread 关键字(GCC 扩展)来声明线程本地存储变量。以下是一个示例:

#include <iostream>
#include <thread>

__thread int threadLocalValue = 0;

void incrementThreadLocal() {
    for (int i = 0; i < 1000; ++i) {
        threadLocalValue++;
    }
    std::cout << "Thread local value in this thread: " << threadLocalValue << std::endl;
}

int main() {
    std::thread t1(incrementThreadLocal);
    std::thread t2(incrementThreadLocal);

    t1.join();
    t2.join();

    return 0;
}

在这个示例中,threadLocalValue 是一个线程本地存储变量。每个线程在执行 incrementThreadLocal 函数时,操作的都是自己的 threadLocalValue 副本,所以不会出现线程安全问题,并且函数是可重入的。

正确处理函数调用

可重入函数不能调用不可重入的函数。如果一个函数调用了其他函数,需要确保被调用的函数也是可重入的。例如,假设我们有一个不可重入的函数 nonReentrantFunction

static int globalResult = 0;

void nonReentrantFunction(int num) {
    globalResult += num;
}

如果在一个可重入函数中调用 nonReentrantFunction,那么这个函数就不再是可重入的了。因此,在设计函数时,要仔细考虑函数间的调用关系,确保整个调用链都是可重入的。

实际应用中的线程安全与可重入性设计

多线程服务器编程

在多线程服务器编程中,线程安全和可重入性设计至关重要。例如,一个处理客户端请求的服务器,可能会有多个线程同时处理不同客户端的请求。服务器可能共享一些资源,如数据库连接池、缓存等。

假设我们使用 Java 编写一个简单的多线程服务器,处理客户端的登录请求。服务器需要验证用户信息,可能会查询数据库。我们可以使用连接池来管理数据库连接,并且要确保对连接池的访问是线程安全的。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class LoginServer {
    private static final String DB_URL = "jdbc:mysql://localhost:3306/mydb";
    private static final String DB_USER = "root";
    private static final String DB_PASSWORD = "password";
    private static final int POOL_SIZE = 10;

    private final BlockingQueue<Connection> connectionPool;

    public LoginServer() {
        connectionPool = new ArrayBlockingQueue<>(POOL_SIZE);
        for (int i = 0; i < POOL_SIZE; ++i) {
            try {
                Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
                connectionPool.add(conn);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public boolean validateUser(String username, String password) {
        Connection conn = null;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        try {
            conn = connectionPool.take();
            String sql = "SELECT * FROM users WHERE username =? AND password =?";
            pstmt = conn.prepareStatement(sql);
            pstmt.setString(1, username);
            pstmt.setString(2, password);
            rs = pstmt.executeQuery();
            return rs.next();
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        } finally {
            if (rs!= null) {
                try {
                    rs.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (pstmt!= null) {
                try {
                    pstmt.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (conn!= null) {
                try {
                    connectionPool.put(conn);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

在这个示例中,LoginServer 使用 BlockingQueue 实现了一个简单的数据库连接池。validateUser 方法从连接池中获取连接,执行数据库查询,然后将连接放回连接池。通过这种方式,确保了多个线程对连接池的安全访问,体现了线程安全设计。同时,该方法没有依赖静态或全局变量,是可重入的。

并发数据结构的设计

在并发编程中,设计线程安全的并发数据结构也是非常重要的。例如,设计一个线程安全的队列。

在 Python 中,queue.Queue 就是一个线程安全的队列。以下是一个简单的示例,展示如何使用它:

import threading
import queue

q = queue.Queue()


def producer():
    for i in range(10):
        q.put(i)
        print(f"Produced: {i}")


def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        q.task_done()


producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
q.put(None)  # 通知消费者结束
consumer_thread.join()

在这个示例中,queue.Queue 内部使用了锁机制来保证线程安全。生产者线程将数据放入队列,消费者线程从队列中取出数据,不会出现竞争条件。并且,由于队列操作的原子性,相关的函数(如 putget)在多线程环境下是可重入的。

总结线程安全与可重入性的要点

  1. 线程安全:核心是解决多个线程对共享资源的竞争访问问题,通过互斥锁、读写锁、信号量等同步机制来保证数据一致性。在设计时,要明确哪些资源是共享的,以及如何保护这些共享资源。
  2. 可重入性:强调函数在多线程环境下被中断后再次进入的正确性。通过避免使用静态和全局变量(除非线程安全)、使用线程本地存储以及确保函数调用链的可重入性来实现。
  3. 关系与应用:可重入函数通常是线程安全的,但线程安全函数不一定可重入。在实际应用中,如多线程服务器编程和并发数据结构设计,要同时考虑线程安全与可重入性,以构建健壮、高效的并发程序。

在并发编程领域,线程安全与可重入性设计是保证程序正确性和稳定性的关键。开发者需要深入理解这些概念,并根据具体的应用场景选择合适的设计方法和同步机制,从而编写出高质量的并发代码。