MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go语言条件变量(sync.Cond)实战

2022-01-077.1k 阅读

Go 语言条件变量(sync.Cond)概述

在并发编程中,我们常常会遇到需要多个 goroutine 之间进行协作的场景。比如,一个 goroutine 可能需要等待某个条件满足后才能继续执行,而这个条件可能由其他 goroutine 来改变。Go 语言的 sync.Cond 类型正是为了解决这类问题而设计的。

sync.Cond 基于一个共享的 sync.Locker(通常是 sync.Mutexsync.RWMutex)来实现条件变量的功能。它允许一个或多个 goroutine 等待某个条件满足,当条件满足时,其他 goroutine 可以通过 Cond 的方法来唤醒等待的 goroutine。

sync.Cond 的结构与方法

sync.Cond 的结构体定义如下:

type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

其中,L 是一个实现了 Locker 接口的锁,在操作条件变量时需要持有该锁。

sync.Cond 提供了以下几个重要的方法:

  1. NewCond 函数:用于创建一个新的 Cond 实例。
func NewCond(l Locker) *Cond

该函数接受一个实现了 Locker 接口的锁作为参数,并返回一个指向新创建的 Cond 实例的指针。

  1. Wait 方法:调用该方法会使当前 goroutine 阻塞,直到被唤醒。
func (c *Cond) Wait()

在调用 Wait 方法前,必须先获取 Cond 关联的锁 LWait 方法会释放锁,并将当前 goroutine 加入等待队列,然后阻塞。当该 goroutine 被唤醒时,Wait 方法会重新获取锁并返回。

  1. Signal 方法:唤醒等待队列中的一个 goroutine。
func (c *Cond) Signal()

同样,在调用 Signal 方法前,必须先获取 Cond 关联的锁 L。该方法会从等待队列中随机选择一个 goroutine 并唤醒它。

  1. Broadcast 方法:唤醒等待队列中的所有 goroutine。
func (c *Cond) Broadcast()

Signal 方法类似,调用 Broadcast 方法前也需要先获取锁 L。该方法会唤醒等待队列中的所有 goroutine。

简单示例:生产者 - 消费者模型

生产者 - 消费者模型是并发编程中一个经典的模型,非常适合用来演示 sync.Cond 的使用。

package main

import (
    "fmt"
    "sync"
)

type Queue struct {
    data []int
    size int
    cond *sync.Cond
    mutex sync.Mutex
}

func NewQueue(size int) *Queue {
    q := &Queue{
        data: make([]int, 0, size),
        size: size,
    }
    q.cond = sync.NewCond(&q.mutex)
    return q
}

func (q *Queue) Enqueue(item int) {
    q.mutex.Lock()
    defer q.mutex.Unlock()

    for len(q.data) == q.size {
        q.cond.Wait()
    }
    q.data = append(q.data, item)
    q.cond.Signal()
}

func (q *Queue) Dequeue() int {
    q.mutex.Lock()
    defer q.mutex.Unlock()

    for len(q.data) == 0 {
        q.cond.Wait()
    }
    item := q.data[0]
    q.data = q.data[1:]
    q.cond.Signal()
    return item
}

func main() {
    queue := NewQueue(2)
    var wg sync.WaitGroup

    wg.Add(2)
    go func() {
        defer wg.Done()
        for i := 0; i < 3; i++ {
            queue.Enqueue(i)
            fmt.Printf("Produced: %d\n", i)
        }
    }()

    go func() {
        defer wg.Done()
        for i := 0; i < 3; i++ {
            item := queue.Dequeue()
            fmt.Printf("Consumed: %d\n", item)
        }
    }()

    wg.Wait()
}

在这个示例中,我们定义了一个 Queue 结构体,它包含一个整数切片 data 用于存储数据,size 表示队列的最大容量,cond 是一个 sync.Cond 实例,mutex 是一个互斥锁。

NewQueue 函数用于创建一个新的队列实例,并初始化 condmutex

Enqueue 方法用于向队列中添加元素。在添加元素前,会检查队列是否已满,如果已满,则调用 cond.Wait() 方法阻塞当前 goroutine,直到队列有空间。添加元素后,调用 cond.Signal() 方法唤醒一个等待的 goroutine。

Dequeue 方法用于从队列中取出元素。在取出元素前,会检查队列是否为空,如果为空,则调用 cond.Wait() 方法阻塞当前 goroutine,直到队列中有元素。取出元素后,调用 cond.Signal() 方法唤醒一个等待的 goroutine。

main 函数中,我们创建了一个容量为 2 的队列,并启动了一个生产者 goroutine 和一个消费者 goroutine。生产者 goroutine 向队列中添加 3 个元素,消费者 goroutine 从队列中取出 3 个元素。由于队列容量有限,生产者和消费者会根据队列的状态进行等待和唤醒操作,从而实现了生产者 - 消费者模型。

复杂示例:多生产者 - 多消费者模型

接下来,我们看一个更复杂的多生产者 - 多消费者模型的示例。

package main

import (
    "fmt"
    "sync"
)

type TaskQueue struct {
    tasks []int
    capacity int
    cond *sync.Cond
    mutex sync.Mutex
}

func NewTaskQueue(capacity int) *TaskQueue {
    tq := &TaskQueue{
        tasks: make([]int, 0, capacity),
        capacity: capacity,
    }
    tq.cond = sync.NewCond(&tq.mutex)
    return tq
}

func (tq *TaskQueue) AddTask(task int) {
    tq.mutex.Lock()
    defer tq.mutex.Unlock()

    for len(tq.tasks) == tq.capacity {
        tq.cond.Wait()
    }
    tq.tasks = append(tq.tasks, task)
    tq.cond.Broadcast()
}

func (tq *TaskQueue) ProcessTask() int {
    tq.mutex.Lock()
    defer tq.mutex.Unlock()

    for len(tq.tasks) == 0 {
        tq.cond.Wait()
    }
    task := tq.tasks[0]
    tq.tasks = tq.tasks[1:]
    tq.cond.Broadcast()
    return task
}

func main() {
    taskQueue := NewTaskQueue(3)
    var wg sync.WaitGroup

    numProducers := 3
    numConsumers := 2

    for i := 0; i < numProducers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 3; j++ {
                task := id*3 + j
                taskQueue.AddTask(task)
                fmt.Printf("Producer %d added task: %d\n", id, task)
            }
        }(i)
    }

    for i := 0; i < numConsumers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                task := taskQueue.ProcessTask()
                fmt.Printf("Consumer %d processed task: %d\n", id, task)
            }
        }(i)
    }

    wg.Wait()
}

在这个示例中,我们定义了一个 TaskQueue 结构体,与前面的 Queue 类似,但这里用于处理更复杂的多生产者和多消费者场景。

AddTask 方法用于生产者向任务队列中添加任务。如果队列已满,生产者会等待。添加任务后,调用 cond.Broadcast() 方法唤醒所有等待的消费者,因为可能有多个消费者在等待任务。

ProcessTask 方法用于消费者从任务队列中取出任务。如果队列为空,消费者会等待。取出任务后,同样调用 cond.Broadcast() 方法唤醒所有等待的生产者,因为可能有多个生产者在等待队列有空间。

main 函数中,我们创建了一个容量为 3 的任务队列,并启动了 3 个生产者 goroutine 和 2 个消费者 goroutine。每个生产者会添加 3 个任务,每个消费者会处理 5 个任务。通过这种方式,展示了多生产者 - 多消费者模型下 sync.Cond 的使用。

sync.Cond 使用注意事项

  1. 锁的正确使用:在调用 sync.Cond 的任何方法(WaitSignalBroadcast)前,必须先获取 Cond 关联的锁 L,并且在操作完成后及时释放锁。否则,会导致竞态条件或死锁。
  2. 等待条件的判断:在调用 Wait 方法前,应该使用 for 循环来检查条件是否满足,而不是简单的 if 语句。这是因为 Wait 方法可能会被虚假唤醒(在没有调用 SignalBroadcast 的情况下被唤醒),使用 for 循环可以确保在条件真正满足时才继续执行。
  3. 选择合适的唤醒方法:根据具体的业务需求,选择合适的唤醒方法。如果只有一个 goroutine 需要被唤醒,使用 Signal 方法;如果所有等待的 goroutine 都需要被唤醒,使用 Broadcast 方法。使用不当可能会导致性能问题或逻辑错误。
  4. 避免死锁:在复杂的并发场景中,要特别注意避免死锁。例如,确保在唤醒 goroutine 后,被唤醒的 goroutine 能够获取到所需的资源并继续执行,而不会因为资源被其他 goroutine 长期占用而再次进入等待状态,形成死锁。

与其他并发原语的比较

  1. channel 比较channel 也可以用于 goroutine 之间的同步和通信。与 sync.Cond 相比,channel 更侧重于数据的传递,而 sync.Cond 更侧重于基于条件的同步。例如,在生产者 - 消费者模型中,channel 可以直接传递数据,而 sync.Cond 需要结合共享数据结构(如队列)来实现相同的功能。channel 的使用相对简单直观,但在一些复杂的条件同步场景下,sync.Cond 可能更灵活。
  2. sync.WaitGroup 比较sync.WaitGroup 主要用于等待一组 goroutine 完成任务。它不涉及条件同步,只是简单地阻塞一个 goroutine,直到所有被 Add 方法计数的 goroutine 调用 Done 方法。而 sync.Cond 是基于条件的同步,用于在某个条件满足时唤醒等待的 goroutine。两者的应用场景有明显的区别。

总结 sync.Cond 的应用场景

  1. 资源池管理:在连接池、线程池等资源池的实现中,sync.Cond 可以用于管理资源的获取和释放。当资源池中的资源耗尽时,获取资源的 goroutine 可以等待,直到有资源被释放。
  2. 任务调度:在任务调度系统中,调度器可以使用 sync.Cond 来通知工作线程有新的任务到达,或者工作线程可以等待直到有任务可处理。
  3. 分布式系统中的协调:在分布式系统中,节点之间可能需要根据某些条件进行同步和协调。sync.Cond 可以用于实现简单的分布式条件同步机制,尽管在实际的分布式系统中,通常会使用更复杂的分布式协调工具(如 ZooKeeper)。

通过以上对 sync.Cond 的详细介绍、示例代码以及与其他并发原语的比较,相信你对 Go 语言中 sync.Cond 的使用和应用场景有了更深入的理解。在实际的并发编程中,合理使用 sync.Cond 可以有效地解决 goroutine 之间的协作问题,提高程序的并发性能和稳定性。