MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python threading模块中的Thread类解析

2022-03-042.1k 阅读

1. 简介

在Python的多线程编程中,threading模块是一个强大且常用的工具。其中,Thread类作为threading模块的核心组件,负责创建和管理线程。通过Thread类,开发者能够轻松地实现多线程并发执行任务,充分利用多核CPU的优势,提高程序的运行效率。

2. 类的定义及初始化

Thread类的定义位于threading模块中,其基本的初始化语法如下:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • group:这个参数预留用于将来扩展线程组的功能,目前必须设置为None
  • target:指定线程要执行的可调用对象(函数)。如果不指定,线程启动后不会执行任何用户定义的代码。
  • name:线程的名称。如果不指定,Python会自动为线程分配一个唯一的名称,格式为Thread-N,其中N是一个递增的数字。
  • args:传递给target函数的位置参数,以元组的形式表示。
  • kwargs:传递给target函数的关键字参数,以字典的形式表示。
  • daemon:设置线程是否为守护线程。守护线程会在主线程结束时自动结束,而不管它们是否完成任务。如果不指定,默认值取决于创建线程的线程是否为守护线程。主线程默认是非守护线程,因此在主线程中创建的新线程默认也为非守护线程。

示例代码:

import threading


def worker(num):
    print(f"Worker {num} is working")


threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

在上述代码中,我们创建了5个线程,每个线程都执行worker函数,并传递不同的参数num

3. 线程的启动与运行

一旦创建了Thread实例,就可以通过调用start()方法来启动线程。start()方法会安排线程在单独的控制线程中执行target函数。

当线程启动后,它会开始执行target函数。如果在初始化Thread实例时没有指定target,线程启动后除了执行一些内部初始化操作外,不会执行任何用户定义的代码。

示例代码:

import threading
import time


def print_delay(message, delay):
    time.sleep(delay)
    print(message)


t1 = threading.Thread(target=print_delay, args=("Hello after 2 seconds", 2))
t2 = threading.Thread(target=print_delay, args=("Hello after 1 second", 1))

t1.start()
t2.start()

在这个例子中,t1线程会在延迟2秒后打印消息,t2线程会在延迟1秒后打印消息。由于两个线程是并发执行的,t2线程的消息会先于t1线程的消息打印出来。

4. 线程的阻塞与等待

有时候,我们需要主线程等待某个或某些线程执行完毕后再继续执行。这可以通过调用线程的join()方法来实现。join()方法会阻塞调用它的线程(通常是主线程),直到被调用join()的线程执行完毕。

join()方法还可以接受一个可选的timeout参数,用于指定等待线程结束的最长时间(以秒为单位)。如果在timeout指定的时间内线程没有结束,join()方法会返回,调用线程继续执行。

示例代码:

import threading
import time


def long_running_task():
    time.sleep(3)
    print("Long running task is done")


t = threading.Thread(target=long_running_task)
t.start()

print("Waiting for the thread to finish...")
t.join()
print("Thread has finished, main thread continues")

在上述代码中,主线程会等待long_running_task线程执行完毕(睡眠3秒)后,才会打印“Thread has finished, main thread continues”。

如果我们设置timeout参数:

import threading
import time


def long_running_task():
    time.sleep(3)
    print("Long running task is done")


t = threading.Thread(target=long_running_task)
t.start()

print("Waiting for the thread to finish...")
t.join(1)
print("Timeout reached or thread has finished, main thread continues")

在这个例子中,主线程只会等待1秒。由于long_running_task线程需要3秒才能完成,主线程在等待1秒后会继续执行,打印“Timeout reached or thread has finished, main thread continues”,而此时long_running_task线程仍在执行。

5. 线程的属性与方法

除了start()join()方法外,Thread类还有其他一些重要的属性和方法。

  • name:获取或设置线程的名称。
  • ident:获取线程的标识符。线程启动后,该属性会返回一个唯一的整数值,代表该线程。如果线程尚未启动,该属性为None
  • is_alive():判断线程是否处于活动状态(即是否已经启动且尚未结束)。
  • daemon:获取或设置线程是否为守护线程。

示例代码:

import threading
import time


def short_task():
    time.sleep(1)
    print("Short task is done")


t = threading.Thread(target=short_task)
print(f"Thread name before start: {t.name}")
print(f"Thread is alive before start: {t.is_alive()}")
print(f"Thread ident before start: {t.ident}")
print(f"Thread is daemon before start: {t.daemon}")

t.start()

print(f"Thread name after start: {t.name}")
print(f"Thread is alive after start: {t.is_alive()}")
print(f"Thread ident after start: {t.ident}")
print(f"Thread is daemon after start: {t.daemon}")

t.join()

print(f"Thread is alive after join: {t.is_alive()}")

在上述代码中,我们在不同阶段获取线程的属性并打印,展示了线程属性在不同状态下的变化。

6. 自定义线程类

除了使用Thread类并传递target函数外,我们还可以通过继承Thread类来自定义线程类。自定义线程类需要重写run()方法,该方法中包含线程要执行的代码。

示例代码:

import threading


class MyThread(threading.Thread):
    def __init__(self, message):
        super().__init__()
        self.message = message

    def run(self):
        print(self.message)


threads = []
messages = ["Hello from thread 1", "Hello from thread 2", "Hello from thread 3"]
for msg in messages:
    t = MyThread(msg)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在这个例子中,我们定义了MyThread类,继承自threading.Thread。通过重写run()方法,每个线程实例在启动后会打印出各自的消息。

7. 线程间的共享数据与同步

当多个线程同时访问和修改共享数据时,可能会导致数据竞争和不一致的问题。为了解决这些问题,Python提供了多种同步机制,如锁(Lock)、信号量(Semaphore)、事件(Event)和条件变量(Condition)等。

7.1 锁(Lock)

锁是一种最基本的同步工具。它只有两种状态:锁定和未锁定。当一个线程获取了锁(将其状态设为锁定),其他线程就无法获取该锁,直到该线程释放锁(将其状态设为未锁定)。

示例代码:

import threading

counter = 0
lock = threading.Lock()


def increment():
    global counter
    with lock:
        counter += 1
        print(f"Counter incremented to {counter}")


threads = []
for _ in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在上述代码中,lock用于保护对counter的修改。with lock语句块会自动获取和释放锁,确保在任何时刻只有一个线程能够修改counter的值,避免了数据竞争问题。

7.2 信号量(Semaphore)

信号量是一个计数器,它允许一定数量的线程同时访问共享资源。当一个线程获取信号量时,计数器减1;当线程释放信号量时,计数器加1。当计数器为0时,其他线程无法获取信号量,直到有线程释放信号量。

示例代码:

import threading
import time

semaphore = threading.Semaphore(3)


def limited_resource_task():
    with semaphore:
        print(f"{threading.current_thread().name} has access to the limited resource")
        time.sleep(2)
        print(f"{threading.current_thread().name} is done with the limited resource")


threads = []
for i in range(5):
    t = threading.Thread(target=limited_resource_task)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在这个例子中,信号量允许最多3个线程同时访问有限资源。当一个线程进入with semaphore语句块时,它获取信号量,计数器减1;当线程离开语句块时,释放信号量,计数器加1。

7.3 事件(Event)

事件是一种简单的线程同步机制,它允许一个线程通知其他线程发生了某个事件。事件对象有一个内部标志,线程可以等待这个标志被设置(使用wait()方法),也可以设置这个标志(使用set()方法)。

示例代码:

import threading
import time


event = threading.Event()


def waiting_thread():
    print("Waiting thread is waiting for the event...")
    event.wait()
    print("Waiting thread has received the event")


def signaling_thread():
    time.sleep(3)
    print("Signaling thread is setting the event")
    event.set()


t1 = threading.Thread(target=waiting_thread)
t2 = threading.Thread(target=signaling_thread)

t1.start()
t2.start()

t1.join()
t2.join()

在上述代码中,waiting_thread线程调用event.wait()方法等待事件发生,signaling_thread线程在睡眠3秒后调用event.set()方法设置事件,从而唤醒waiting_thread线程。

7.4 条件变量(Condition)

条件变量结合了锁和事件的功能,它允许线程在满足特定条件时等待,并且在条件满足时通知其他等待的线程。

示例代码:

import threading


condition = threading.Condition()
queue = []


def producer():
    with condition:
        for i in range(5):
            queue.append(i)
            print(f"Produced {i}")
            condition.notify()
            time.sleep(1)


def consumer():
    with condition:
        while True:
            if not queue:
                condition.wait()
            item = queue.pop(0)
            print(f"Consumed {item}")
            time.sleep(1)


t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

t1.join()
t2.join()

在这个例子中,producer线程向queue中添加元素,并通过condition.notify()通知等待的线程;consumer线程在queue为空时调用condition.wait()等待通知,当收到通知且queue中有元素时,从queue中取出元素并消费。

8. 线程安全与非线程安全

在多线程编程中,理解线程安全和非线程安全的概念非常重要。

  • 线程安全:一个对象或函数如果在多线程环境下能够正确工作,不会因为多个线程同时访问和修改而导致数据错误或不一致,那么它就是线程安全的。例如,Python的Queue模块中的Queue类是线程安全的,多个线程可以安全地向队列中放入和取出元素。
  • 非线程安全:如果一个对象或函数在多线程环境下可能会因为并发访问而出现数据错误或不一致,那么它就是非线程安全的。例如,普通的Python列表(list)和字典(dict)在多线程环境下如果不使用同步机制进行保护,就可能出现数据竞争问题。

示例代码展示非线程安全的情况:

import threading

my_list = []


def add_to_list():
    for i in range(1000):
        my_list.append(i)


threads = []
for _ in range(10):
    t = threading.Thread(target=add_to_list)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(len(my_list))

在上述代码中,多个线程同时向my_list中添加元素,由于没有同步机制保护,最终my_list的长度可能会小于10000,因为存在数据竞争问题。

9. 多线程编程的注意事项

  • 资源竞争:要时刻注意共享资源的访问,合理使用同步机制来避免资源竞争和数据不一致问题。
  • 死锁:死锁是多线程编程中一个常见且难以调试的问题。当两个或多个线程相互等待对方释放资源时,就会发生死锁。为了避免死锁,应尽量按照相同的顺序获取锁,避免嵌套锁,并设置合理的超时机制。
  • 性能开销:虽然多线程可以提高程序的并发性能,但线程的创建、切换和同步也会带来一定的性能开销。在设计多线程程序时,需要权衡并发带来的性能提升和线程管理开销之间的关系。
  • 全局解释器锁(GIL):Python的全局解释器锁会限制同一时刻只有一个线程在解释器中执行字节码。这意味着在CPU密集型任务中,多线程可能无法充分利用多核CPU的优势。对于CPU密集型任务,可以考虑使用multiprocessing模块来利用多核CPU。

10. 总结

threading模块中的Thread类是Python多线程编程的核心组件。通过合理使用Thread类及其相关的同步机制,开发者能够编写出高效、并发的程序。在实际应用中,需要深入理解线程的运行机制、同步原理以及可能出现的问题,从而编写出健壮、可靠的多线程程序。无论是在网络编程、数据处理还是其他需要并发执行任务的场景中,Thread类都为开发者提供了强大的工具。