MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python并发编程实战:构建高效并发应用

2023-08-113.7k 阅读

Python并发编程基础

并发与并行的概念

在深入探讨Python并发编程之前,我们首先要明确并发(Concurrency)与并行(Parallelism)这两个容易混淆的概念。

并发指的是在一段时间内,多个任务看上去像是同时执行。实际上,在单核CPU环境下,操作系统通过快速地在多个任务之间切换,让用户感觉这些任务是同时运行的。例如,我们在电脑上同时打开浏览器、音乐播放器和文档编辑器,操作系统会为每个应用程序分配一定的CPU时间片,轮流执行它们的代码,从而实现并发执行。

并行则是指在同一时刻,多个任务真正地同时执行。这需要多核CPU的支持,每个核心可以独立处理一个任务,从而实现真正的并行计算。例如,在一个4核CPU的电脑上,4个不同的计算任务可以分别在4个核心上同时运行。

Python中的并发编程模型

Python提供了多种并发编程模型,主要包括多线程(Threading)、多进程(Multiprocessing)和异步I/O(Asynchronous I/O)。

多线程

多线程是Python中最基本的并发编程模型之一。线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。在Python中,我们可以使用threading模块来创建和管理线程。

以下是一个简单的多线程示例,创建两个线程分别打印不同的信息:

import threading


def print_message(message):
    print(message)


thread1 = threading.Thread(target=print_message, args=('Hello from thread 1',))
thread2 = threading.Thread(target=print_message, args=('Hello from thread 2',))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

在上述代码中,我们首先定义了一个print_message函数,它接受一个消息参数并打印出来。然后,我们使用threading.Thread类创建了两个线程thread1thread2,分别传入print_message函数以及不同的消息作为参数。通过调用start方法启动线程,join方法等待线程执行完毕。

然而,Python中的多线程有一个局限性,那就是全局解释器锁(Global Interpreter Lock,GIL)。GIL是Python解释器中的一个机制,它确保在同一时刻只有一个线程能够执行Python字节码。这意味着在多核CPU环境下,Python多线程并不能充分利用多核优势实现真正的并行计算,主要适用于I/O密集型任务,如网络请求、文件读写等。

多进程

为了克服GIL的限制,实现真正的并行计算,Python提供了multiprocessing模块来进行多进程编程。进程是计算机中已运行程序的实体,每个进程都有自己独立的内存空间和资源。

以下是一个简单的多进程示例,与上述多线程示例功能类似:

import multiprocessing


def print_message(message):
    print(message)


if __name__ == '__main__':
    process1 = multiprocessing.Process(target=print_message, args=('Hello from process 1',))
    process2 = multiprocessing.Process(target=print_message, args=('Hello from process 2',))

    process1.start()
    process2.start()

    process1.join()
    process2.join()

在这个示例中,我们使用multiprocessing.Process类创建了两个进程process1process2。注意,在使用multiprocessing模块时,通常需要将相关代码放在if __name__ == '__main__':语句块中,这是为了在Windows系统上确保进程启动的正确性。

多进程适用于CPU密集型任务,因为每个进程都可以独立地利用一个CPU核心进行计算。但由于进程间通信(Inter - Process Communication,IPC)比线程间通信更复杂,资源开销也更大,所以在选择多进程还是多线程时需要根据具体任务特性来决定。

异步I/O

异步I/O是一种在不阻塞主线程的情况下执行I/O操作的编程模型。在Python中,主要通过asyncio库来实现异步编程。异步编程特别适合处理大量I/O操作的场景,如网络爬虫、网络服务器等。

以下是一个简单的异步I/O示例,模拟异步网络请求:

import asyncio


async def fetch_data():
    print('Start fetching')
    await asyncio.sleep(2)
    print('Finished fetching')
    return {'data': 'Some fetched data'}


async def main():
    task = asyncio.create_task(fetch_data())
    value = await task
    print(value)


if __name__ == '__main__':
    asyncio.run(main())

在上述代码中,我们定义了一个fetch_data异步函数,使用await asyncio.sleep(2)模拟一个耗时2秒的网络请求。main函数中,我们通过asyncio.create_task创建一个任务,然后使用await等待任务完成并获取返回值。最后,通过asyncio.run来运行整个异步程序。

异步编程的核心在于asyncawait关键字。async用于定义异步函数,await用于暂停异步函数的执行,等待一个可等待对象(如asyncio.sleep返回的对象)完成,然后继续执行。这种方式使得在等待I/O操作完成的同时,主线程可以去执行其他任务,大大提高了程序的效率。

多线程编程深入

线程同步与锁

在多线程编程中,当多个线程同时访问和修改共享资源时,可能会导致数据竞争和不一致的问题。例如,多个线程同时对一个全局变量进行加1操作,如果没有适当的同步机制,最终的结果可能并不是我们期望的。

为了解决这个问题,我们可以使用锁(Lock)。锁是一种同步原语,它有两种状态:锁定(locked)和未锁定(unlocked)。当一个线程获取到锁(将锁的状态设为锁定),其他线程就无法再获取,直到该线程释放锁(将锁的状态设为未锁定)。

以下是一个使用锁来避免数据竞争的示例:

import threading

counter = 0
lock = threading.Lock()


def increment():
    global counter
    for _ in range(1000000):
        lock.acquire()
        try:
            counter += 1
        finally:
            lock.release()


thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Final counter value: {counter}")

在这个示例中,我们定义了一个全局变量counter和一个锁lockincrement函数中,在对counter进行加1操作之前,通过lock.acquire()获取锁,操作完成后通过lock.release()释放锁。使用try - finally语句块可以确保即使在操作过程中出现异常,锁也能被正确释放。

条件变量

条件变量(Condition Variable)是另一种线程同步工具,它允许线程在满足特定条件时等待,在条件满足时被唤醒。条件变量通常与锁一起使用。

以下是一个使用条件变量的生产者 - 消费者模型示例:

import threading
import queue


class Producer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = f"Item {i}"
            self.queue.put(item)
            print(f"Produced: {item}")


class Consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            if item is None:
                break
            print(f"Consumed: {item}")
            self.queue.task_done()


if __name__ == '__main__':
    q = queue.Queue()

    producer = Producer(q)
    consumer = Consumer(q)

    producer.start()
    consumer.start()

    producer.join()
    q.put(None)
    consumer.join()

在这个示例中,Producer线程负责生成数据并放入队列q中,Consumer线程从队列中取出数据并处理。queue.Queue本身已经实现了线程安全,它内部使用了锁和条件变量来管理线程间的同步。Consumer线程通过queue.get()方法获取数据,该方法会在队列为空时阻塞,直到有数据可用。producer.join()确保生产者线程完成生产任务后,向队列中放入一个None值作为结束信号,consumer.join()等待消费者线程处理完所有数据并退出。

信号量

信号量(Semaphore)是一个计数器,它控制同时访问共享资源的线程数量。当一个线程获取信号量时,计数器减1;当线程释放信号量时,计数器加1。当计数器为0时,其他线程无法获取信号量,只能等待。

以下是一个使用信号量控制同时访问资源的线程数量的示例:

import threading
import time


semaphore = threading.Semaphore(3)


def access_resource(thread_num):
    semaphore.acquire()
    try:
        print(f"Thread {thread_num} is accessing the resource")
        time.sleep(2)
        print(f"Thread {thread_num} has finished accessing the resource")
    finally:
        semaphore.release()


for i in range(5):
    thread = threading.Thread(target=access_resource, args=(i,))
    thread.start()

在这个示例中,我们创建了一个信号量semaphore,其初始值为3,表示最多允许3个线程同时访问共享资源。每个线程在访问资源前通过semaphore.acquire()获取信号量,访问完成后通过semaphore.release()释放信号量。这样,同一时刻最多只有3个线程能够访问资源,其他线程需要等待。

多进程编程深入

进程间通信

在多进程编程中,由于每个进程都有自己独立的内存空间,进程间通信(IPC)变得尤为重要。Python的multiprocessing模块提供了多种IPC方式,如管道(Pipe)、队列(Queue)和共享内存(Shared Memory)。

管道

管道是一种简单的进程间通信方式,它可以在两个进程之间传递数据。管道有一个读端和一个写端,数据从写端写入,从读端读出。

以下是一个使用管道进行进程间通信的示例:

import multiprocessing


def sender(conn):
    conn.send('Hello from sender')
    conn.close()


def receiver(conn):
    message = conn.recv()
    print(f"Received: {message}")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target=sender, args=(child_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

在这个示例中,我们通过multiprocessing.Pipe()创建了一个管道,返回两个连接对象parent_connchild_connsender进程通过child_conn.send()发送消息,receiver进程通过parent_conn.recv()接收消息。

队列

队列也是一种常用的进程间通信方式,它可以在多个进程之间安全地传递数据。multiprocessing.Queuethreading.Queue类似,但适用于进程间通信。

以下是一个使用队列进行进程间通信的示例:

import multiprocessing


def producer(queue):
    for i in range(5):
        item = f"Item {i}"
        queue.put(item)
        print(f"Produced: {item}")


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")


if __name__ == '__main__':
    q = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    q.put(None)
    p2.join()

在这个示例中,producer进程将数据放入队列qconsumer进程从队列中取出数据。与线程间通信的队列类似,这里也通过放入None值作为结束信号。

共享内存

共享内存允许不同进程访问同一块内存区域,从而实现数据共享。在multiprocessing模块中,可以使用ValueArray来创建共享内存对象。

以下是一个使用共享内存的示例:

import multiprocessing


def increment_shared_value(shared_value):
    with shared_value.get_lock():
        shared_value.value += 1


if __name__ == '__main__':
    shared_value = multiprocessing.Value('i', 0)

    processes = []
    for _ in range(10):
        p = multiprocessing.Process(target=increment_shared_value, args=(shared_value,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Final shared value: {shared_value.value}")

在这个示例中,我们使用multiprocessing.Value('i', 0)创建了一个初始值为0的共享整数对象shared_valueincrement_shared_value函数通过shared_value.get_lock()获取锁,确保对共享值的修改是线程安全的。多个进程同时对共享值进行加1操作,最终输出共享值的结果。

进程池

在实际应用中,创建和销毁进程的开销较大。为了提高效率,可以使用进程池(Process Pool)来管理一组进程。进程池可以预先创建一定数量的进程,当有任务时,将任务分配给池中的进程执行,任务完成后,进程并不销毁,而是等待下一个任务。

以下是一个使用进程池的示例:

import multiprocessing


def square(x):
    return x * x


if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, range(10))
        print(results)

在这个示例中,我们使用multiprocessing.Pool(processes = 4)创建了一个包含4个进程的进程池。pool.map方法将square函数应用到range(10)的每个元素上,返回一个包含计算结果的列表。with语句确保在使用完进程池后,自动关闭并等待所有进程完成任务,释放资源。

异步编程深入

异步函数与协程

在Python的异步编程中,异步函数是通过async关键字定义的函数,它返回一个协程对象(coroutine object)。协程是一种轻量级的线程,它可以在执行过程中暂停并将执行权交给其他协程,从而实现异步执行。

以下是一个简单的异步函数示例:

import asyncio


async def greet():
    print('Hello')
    await asyncio.sleep(1)
    print('World')


if __name__ == '__main__':
    asyncio.run(greet())

在这个示例中,greet函数是一个异步函数,它首先打印Hello,然后通过await asyncio.sleep(1)暂停1秒,最后打印Worldasyncio.run用于运行这个异步函数。

事件循环

事件循环(Event Loop)是异步编程的核心。它负责管理和调度协程的执行,不断地检查是否有可执行的协程任务,将执行权分配给它们。在Python的asyncio库中,可以通过asyncio.get_running_loop()获取当前的事件循环对象。

以下是一个手动管理事件循环的示例:

import asyncio


async def task1():
    print('Task 1 started')
    await asyncio.sleep(2)
    print('Task 1 finished')


async def task2():
    print('Task 2 started')
    await asyncio.sleep(1)
    print('Task 2 finished')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        tasks = [task1(), task2()]
        loop.run_until_complete(asyncio.gather(*tasks))
    finally:
        loop.close()

在这个示例中,我们首先定义了两个异步任务task1task2。然后通过asyncio.get_event_loop()获取事件循环对象loop。使用loop.run_until_complete方法,将asyncio.gather(*tasks)作为参数传入,asyncio.gather用于将多个协程包装成一个可等待对象,事件循环会调度这些协程的执行,直到所有任务完成。最后,通过loop.close()关闭事件循环。

异步上下文管理器

异步上下文管理器(Asynchronous Context Manager)是一种特殊的对象,它支持异步的enterexit操作。在异步编程中,当需要在异步函数中管理资源(如数据库连接、文件句柄等)时,异步上下文管理器非常有用。

以下是一个自定义异步上下文管理器的示例:

import asyncio


class AsyncResource:
    async def __aenter__(self):
        print('Entering async resource')
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print('Exiting async resource')
        await asyncio.sleep(1)


async def use_async_resource():
    async with AsyncResource() as resource:
        print('Using async resource')


if __name__ == '__main__':
    asyncio.run(use_async_resource())

在这个示例中,我们定义了一个AsyncResource类,它实现了__aenter____aexit__方法,分别用于进入和退出异步上下文。在use_async_resource函数中,使用async with语句来管理AsyncResource的生命周期,确保在进入和退出上下文时执行相应的异步操作。

并发编程在实际项目中的应用

网络爬虫

在网络爬虫项目中,并发编程可以显著提高爬取效率。例如,我们可以使用异步I/O来同时发起多个HTTP请求,避免在等待一个请求响应时浪费时间。

以下是一个简单的异步网络爬虫示例,使用aiohttp库来发送异步HTTP请求:

import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


if __name__ == '__main__':
    asyncio.run(main())

在这个示例中,fetch函数使用aiohttp.ClientSession发送异步HTTP GET请求,并返回响应的文本内容。main函数中,我们创建了多个fetch任务,并使用asyncio.gather等待所有任务完成,最后打印每个任务的结果。

网络服务器

在开发网络服务器时,并发编程可以处理大量的并发连接。例如,使用asyncio库可以构建高性能的异步网络服务器。

以下是一个简单的异步TCP服务器示例:

import asyncio


async def handle_connection(reader, writer):
    data = await reader.read(1024)
    message = data.decode('utf - 8')
    addr = writer.get_extra_info('peername')
    print(f"Received {message!r} from {addr!r}")

    response = f"Hello, you sent: {message}"
    writer.write(response.encode('utf - 8'))
    await writer.drain()

    writer.close()


async def main():
    server = await asyncio.start_server(
        handle_connection, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(main())

在这个示例中,handle_connection函数处理每个客户端连接,读取客户端发送的数据,返回一个响应,并关闭连接。main函数通过asyncio.start_server启动一个TCP服务器,绑定到127.0.0.1:8888,并使用server.serve_forever()持续监听客户端连接。

数据分析与处理

在数据分析与处理任务中,如果涉及到大量数据的计算和处理,多进程编程可以利用多核CPU的优势提高处理速度。例如,对一个大型数据集进行并行计算。

以下是一个使用多进程进行数据处理的示例,计算列表中每个数的平方:

import multiprocessing


def square(x):
    return x * x


if __name__ == '__main__':
    data = list(range(1000000))
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, data)
    print(results)

在这个示例中,我们将一个包含100万个数字的列表data作为输入,通过进程池的map方法并行计算每个数的平方,大大提高了计算效率。

通过以上对Python并发编程的深入探讨,我们了解了不同并发编程模型的原理、使用方法以及在实际项目中的应用。在实际开发中,根据具体的任务需求和场景,选择合适的并发编程模型,可以构建出高效、稳定的应用程序。无论是处理I/O密集型任务还是CPU密集型任务,Python都提供了强大的工具和库来实现并发编程,提升程序的性能和响应能力。