MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python多线程编程实战

2023-04-061.4k 阅读

一、Python 多线程基础

在 Python 中,多线程编程是一种实现并发执行任务的方式。多线程允许在同一个进程内同时运行多个线程,每个线程可以独立执行一段代码。这在处理 I/O 密集型任务(如网络请求、文件读写等)时非常有用,因为线程在等待 I/O 操作完成时可以释放 CPU 资源,让其他线程有机会执行。

Python 的 threading 模块提供了对多线程编程的支持。下面是一个简单的示例,展示如何创建和启动一个线程:

import threading


def print_numbers():
    for i in range(10):
        print(i)


# 创建一个线程对象
thread = threading.Thread(target=print_numbers)
# 启动线程
thread.start()
# 等待线程执行完毕
thread.join()

在上述代码中,我们首先定义了一个函数 print_numbers,它会打印 0 到 9 的数字。然后我们使用 threading.Thread 创建了一个线程对象,并将 print_numbers 函数作为目标传递给它。接着通过调用 start() 方法启动线程,最后调用 join() 方法等待线程执行结束。

1.1 线程的状态

线程在其生命周期中有几种不同的状态:

  • 新建(New):当线程对象被创建但 start() 方法尚未被调用时,线程处于新建状态。
  • 就绪(Runnable):调用 start() 方法后,线程进入就绪状态。此时线程已经准备好运行,但不一定立即执行,它需要等待 CPU 调度。
  • 运行(Running):当 CPU 调度到该线程时,它进入运行状态并开始执行 target 函数中的代码。
  • 阻塞(Blocked):如果线程执行 I/O 操作、等待锁或调用 sleep() 等函数,它会进入阻塞状态。在阻塞状态下,线程不消耗 CPU 资源,直到阻塞条件解除,线程重新回到就绪状态。
  • 死亡(Dead):当线程的 target 函数执行完毕或者被异常终止时,线程进入死亡状态。

1.2 线程与进程的区别

  • 资源分配:进程是资源分配的基本单位,每个进程都有自己独立的地址空间、内存、数据栈等资源。而线程是 CPU 调度的基本单位,同一进程内的多个线程共享进程的资源,如内存空间、文件描述符等。
  • 上下文切换开销:进程间上下文切换开销较大,因为需要切换整个地址空间等资源。而线程间上下文切换开销相对较小,因为它们共享大部分资源,主要切换的是 CPU 寄存器等少量数据。
  • 创建和销毁开销:创建和销毁进程的开销比线程大,因为进程需要分配和释放大量系统资源。

二、多线程共享资源问题

由于多线程共享进程的资源,当多个线程同时访问和修改共享资源时,可能会引发一些问题,比如竞态条件(Race Condition)和死锁(Deadlock)。

2.1 竞态条件

竞态条件是指多个线程同时访问和修改共享资源,导致最终结果取决于线程执行的顺序。例如,下面的代码展示了一个简单的竞态条件场景:

import threading

counter = 0


def increment():
    global counter
    for _ in range(1000000):
        counter = counter + 1


threads = []
for _ in range(2):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("Final counter value:", counter)

在上述代码中,我们定义了一个全局变量 counter,并在 increment 函数中对其进行 100 万次的加 1 操作。我们创建了两个线程来执行这个函数。理论上,最终 counter 的值应该是 200 万,但由于竞态条件的存在,每次运行程序得到的结果可能都不一样。

这是因为 counter = counter + 1 这一操作不是原子性的,它实际上由三个步骤组成:读取 counter 的值、加 1 操作、将结果写回 counter。在多线程环境下,当一个线程读取了 counter 的值,还未完成写回操作时,另一个线程也读取了 counter 的值,这样就会导致部分加 1 操作丢失。

2.2 死锁

死锁是指两个或多个线程相互等待对方释放资源,从而导致所有线程都无法继续执行的情况。下面是一个简单的死锁示例:

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()


def thread1():
    lock1.acquire()
    print("Thread 1 acquired lock1")
    lock2.acquire()
    print("Thread 1 acquired lock2")
    lock2.release()
    print("Thread 1 released lock2")
    lock1.release()
    print("Thread 1 released lock1")


def thread2():
    lock2.acquire()
    print("Thread 2 acquired lock2")
    lock1.acquire()
    print("Thread 2 acquired lock1")
    lock1.release()
    print("Thread 2 released lock1")
    lock2.release()
    print("Thread 2 released lock2")


t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)

t1.start()
t2.start()

t1.join()
t2.join()

在上述代码中,thread1 首先获取 lock1,然后尝试获取 lock2,而 thread2 首先获取 lock2,然后尝试获取 lock1。如果 thread1 获取了 lock1,同时 thread2 获取了 lock2,那么它们将相互等待对方释放锁,从而导致死锁。

三、解决共享资源问题的方法

为了解决多线程共享资源带来的问题,我们可以使用锁(Lock)、信号量(Semaphore)和队列(Queue)等工具。

3.1 锁(Lock)

锁是一种最简单的同步机制,它可以保证在同一时间只有一个线程能够访问共享资源。在 Python 中,可以使用 threading.Lock 来创建锁对象。下面是使用锁解决竞态条件问题的示例:

import threading

counter = 0
lock = threading.Lock()


def increment():
    global counter
    for _ in range(1000000):
        lock.acquire()
        try:
            counter = counter + 1
        finally:
            lock.release()


threads = []
for _ in range(2):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("Final counter value:", counter)

在上述代码中,我们在 increment 函数中使用 lock.acquire() 获取锁,这样在一个线程进入临界区(对 counter 进行操作的代码块)时,其他线程无法进入,直到该线程调用 lock.release() 释放锁。通过这种方式,我们保证了对 counter 的操作是原子性的,避免了竞态条件。

3.2 信号量(Semaphore)

信号量是一个计数器,它允许一定数量的线程同时访问共享资源。在 Python 中,可以使用 threading.Semaphore 来创建信号量对象。例如,下面的代码展示了如何使用信号量来限制同时访问资源的线程数量:

import threading
import time

semaphore = threading.Semaphore(3)


def access_resource(thread_id):
    semaphore.acquire()
    print(f"Thread {thread_id} acquired semaphore, accessing resource")
    time.sleep(2)
    print(f"Thread {thread_id} finished accessing resource")
    semaphore.release()


threads = []
for i in range(5):
    thread = threading.Thread(target=access_resource, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

在上述代码中,我们创建了一个信号量 semaphore,其初始值为 3,表示最多允许 3 个线程同时访问共享资源。每个线程在访问资源前调用 semaphore.acquire(),访问结束后调用 semaphore.release()。这样,当有超过 3 个线程尝试获取信号量时,多余的线程将被阻塞,直到有线程释放信号量。

3.3 队列(Queue)

队列是一种线程安全的数据结构,它可以用于在多个线程之间安全地传递数据。在 Python 中,queue 模块提供了 Queue 类。下面是一个简单的示例,展示如何使用队列在两个线程之间传递数据:

import threading
import queue

q = queue.Queue()


def producer():
    for i in range(5):
        q.put(i)
        print(f"Produced {i}")
        time.sleep(1)


def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed {item}")
        time.sleep(2)
    q.task_done()


producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
q.put(None)
consumer_thread.join()

在上述代码中,producer 线程将数据放入队列 q 中,consumer 线程从队列中取出数据进行处理。queue.get() 方法会阻塞线程,直到队列中有数据可用。通过这种方式,我们避免了共享资源的直接竞争,实现了线程间的安全通信。

四、多线程编程中的高级主题

4.1 线程池

线程池是一种管理和复用线程的机制,它可以避免频繁创建和销毁线程带来的开销。在 Python 中,可以使用 concurrent.futures 模块中的 ThreadPoolExecutor 来实现线程池。下面是一个简单的示例:

import concurrent.futures
import time


def task(num):
    print(f"Task {num} started")
    time.sleep(2)
    print(f"Task {num} finished")
    return num * num


with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(f"Task result: {result}")

在上述代码中,我们创建了一个 ThreadPoolExecutor,并指定最大线程数为 3。然后我们使用 submit() 方法向线程池提交任务,submit() 方法会返回一个 Future 对象,通过这个对象可以获取任务的执行结果。as_completed() 函数会在任务完成时返回,我们可以通过 future.result() 获取任务的返回值。

4.2 线程本地数据(Thread - Local Data)

线程本地数据是指每个线程都有自己独立的一份数据副本,不同线程之间的数据互不干扰。在 Python 中,可以使用 threading.local 来实现线程本地数据。例如:

import threading

local_data = threading.local()


def worker():
    local_data.value = threading.current_thread().name
    print(f"Thread {local_data.value} has set its local data")
    time.sleep(1)
    print(f"Thread {local_data.value} is accessing its local data: {local_data.value}")


threads = []
for _ in range(3):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

在上述代码中,我们创建了一个 threading.local 对象 local_data。每个线程都可以为 local_data 设置自己的属性值,而且这些值在不同线程之间是隔离的。

4.3 多线程与 GIL(全局解释器锁)

Python 的全局解释器锁(Global Interpreter Lock,GIL)是一个在 CPython 解释器中存在的机制,它确保在同一时间只有一个线程能够执行 Python 字节码。这意味着在多线程编程中,即使有多个 CPU 核心,Python 多线程也无法真正利用多核优势进行并行计算。

对于 I/O 密集型任务,GIL 对性能的影响较小,因为线程在等待 I/O 操作时会释放 GIL,让其他线程有机会执行。但对于 CPU 密集型任务,多线程可能无法达到预期的性能提升。

如果需要在 Python 中进行真正的并行计算,可以考虑使用 multiprocessing 模块,它通过创建多个进程来利用多核 CPU 的优势。

五、多线程编程实战案例

5.1 网络爬虫多线程优化

假设我们要编写一个简单的网络爬虫,从多个网页中获取数据。使用多线程可以显著提高爬虫的效率。下面是一个示例:

import threading
import requests
import queue

url_queue = queue.Queue()
result_queue = queue.Queue()


def fetch_url():
    while True:
        url = url_queue.get()
        if url is None:
            break
        try:
            response = requests.get(url)
            result_queue.put((url, response.text))
        except requests.RequestException as e:
            result_queue.put((url, str(e)))
        url_queue.task_done()


def main():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net"
    ]

    num_threads = 3
    threads = []

    for _ in range(num_threads):
        thread = threading.Thread(target=fetch_url)
        thread.start()
        threads.append(thread)

    for url in urls:
        url_queue.put(url)

    for _ in range(num_threads):
        url_queue.put(None)

    for thread in threads:
        thread.join()

    while not result_queue.empty():
        url, result = result_queue.get()
        print(f"URL: {url}, Result: {result}")


if __name__ == "__main__":
    main()

在上述代码中,我们创建了一个 url_queue 用于存放待爬取的 URL,一个 result_queue 用于存放爬取结果。fetch_url 函数从 url_queue 中取出 URL 进行爬取,并将结果放入 result_queue。我们创建了多个线程来执行 fetch_url 函数,从而实现多线程爬取网页。

5.2 文件处理多线程优化

假设我们有一个任务,需要对多个文本文件进行处理,比如统计每个文件中单词的出现次数。可以使用多线程来提高处理效率:

import threading
import os
import queue
from collections import Counter


file_queue = queue.Queue()
result_queue = queue.Queue()


def process_file():
    while True:
        file_path = file_queue.get()
        if file_path is None:
            break
        try:
            with open(file_path, 'r', encoding='utf - 8') as file:
                text = file.read()
                words = text.split()
                word_count = Counter(words)
                result_queue.put((file_path, word_count))
        except Exception as e:
            result_queue.put((file_path, str(e)))
        file_queue.task_done()


def main():
    folder_path = 'path/to/files'
    num_threads = 5
    threads = []

    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith('.txt'):
                file_queue.put(os.path.join(root, file))

    for _ in range(num_threads):
        thread = threading.Thread(target=process_file)
        thread.start()
        threads.append(thread)

    for _ in range(num_threads):
        file_queue.put(None)

    for thread in threads:
        thread.join()

    while not result_queue.empty():
        file_path, result = result_queue.get()
        print(f"File: {file_path}, Result: {result}")


if __name__ == "__main__":
    main()

在上述代码中,我们创建了一个 file_queue 用于存放待处理的文件路径,一个 result_queue 用于存放文件处理结果。process_file 函数从 file_queue 中取出文件路径,读取文件内容并统计单词出现次数,然后将结果放入 result_queue。通过多线程并行处理文件,提高了整体的处理效率。

六、性能调优与注意事项

6.1 性能调优

  • 合理设置线程数量:对于 I/O 密集型任务,可以根据系统的 I/O 性能和 CPU 核心数来适当增加线程数量,以充分利用系统资源。但线程数量过多也会增加上下文切换开销,降低性能。一般来说,可以通过测试不同线程数量下的性能来找到最优值。
  • 避免过度同步:虽然同步机制(如锁、信号量等)可以保证数据的一致性,但过多的同步操作会导致线程阻塞,降低并发性能。尽量减少临界区的代码范围,只对共享资源的关键操作进行同步。

6.2 注意事项

  • 异常处理:在多线程编程中,要注意对线程中的异常进行适当处理。如果一个线程中发生未捕获的异常,可能会导致整个程序崩溃或者出现不可预料的行为。可以在 target 函数中使用 try - except 语句来捕获异常,并进行相应处理。
  • 调试困难:多线程程序的调试比单线程程序更加困难,因为线程的执行顺序是不确定的,可能会导致一些难以重现的问题。可以使用 print 语句、日志记录或者调试工具(如 pdb)来辅助调试。

通过深入理解 Python 多线程编程的原理、掌握解决共享资源问题的方法以及实际应用中的技巧,我们可以编写出高效、稳定的多线程程序,充分发挥 Python 在并发编程方面的优势。在实际应用中,根据任务的特点(如 I/O 密集型还是 CPU 密集型)选择合适的并发方式(多线程、多进程等),可以进一步提升程序的性能。