MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go条件变量使用的实用指南

2024-01-276.4k 阅读

Go 条件变量概述

在 Go 语言的并发编程中,条件变量(sync.Cond)是一个非常重要的同步原语。它用于协调多个 goroutine 之间的同步,通常与互斥锁(sync.Mutex)配合使用。条件变量允许一个或多个 goroutine 等待特定条件的发生,而不会占用大量的 CPU 资源。

条件变量的主要作用是在某个条件满足时通知等待的 goroutine。这在许多场景下都非常有用,比如生产者 - 消费者模型,其中消费者 goroutine 需要等待生产者 goroutine 生产出数据后才能继续消费。

条件变量的创建与基本结构

在 Go 中,sync.Cond 结构体的定义如下:

type Cond struct {
    noCopy noCopy
    L Locker
    notify  notifyList
    checker copyChecker
}

其中,L 字段是一个实现了 Locker 接口的锁,通常是 sync.Mutexsync.RWMutex。这个锁用于保护条件变量相关的数据,确保在修改和检查条件时的线程安全。

要创建一个条件变量,需要使用 sync.NewCond 函数,示例如下:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)

    // 后续可以在此处添加使用条件变量的逻辑
}

在上述代码中,首先创建了一个 sync.Mutex 实例 mu,然后通过 sync.NewCond 函数以 mu 为参数创建了一个条件变量 cond

条件变量的主要方法

Wait 方法

Wait 方法用于阻塞当前 goroutine,直到条件变量被通知。调用 Wait 方法前,必须先获取相关的锁。在调用 Wait 时,Wait 方法会自动释放锁,然后将当前 goroutine 放入等待队列。当该 goroutine 被唤醒时,Wait 方法会重新获取锁,然后返回。

示例代码如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        time.Sleep(2 * time.Second)
        mu.Lock()
        ready = true
        fmt.Println("Setting ready to true")
        cond.Broadcast()
        mu.Unlock()
    }()

    mu.Lock()
    for!ready {
        fmt.Println("Waiting for ready to be true")
        cond.Wait()
    }
    fmt.Println("Ready is true, continuing")
    mu.Unlock()
}

在这段代码中,有一个后台 goroutine 在 2 秒后将 ready 变量设置为 true,并通过 cond.Broadcast() 通知所有等待的 goroutine。主线程在获取锁后,通过 for!ready 循环检查 ready 变量,当 readyfalse 时调用 cond.Wait() 等待通知。一旦收到通知,Wait 方法返回,主线程重新获取锁,继续执行后续逻辑。

使用 for 循环来检查条件是非常重要的,因为在某些情况下,goroutine 可能会被虚假唤醒(spurious wakeup)。虚假唤醒是指在没有任何线程调用 SignalBroadcast 的情况下,等待的 goroutine 被唤醒。通过使用 for 循环重新检查条件,可以确保只有在条件真正满足时才继续执行。

Signal 方法

Signal 方法用于唤醒等待队列中的一个 goroutine。如果有多个 goroutine 在等待,那么只会有一个被唤醒。

示例代码如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        mu.Lock()
        for i := 0; i < 3; i++ {
            for!ready {
                fmt.Printf("Goroutine %d waiting\n", i)
                cond.Wait()
            }
            fmt.Printf("Goroutine %d woke up\n", i)
            ready = false
        }
        mu.Unlock()
    }()

    time.Sleep(1 * time.Second)
    mu.Lock()
    ready = true
    fmt.Println("Signaling one goroutine")
    cond.Signal()
    mu.Unlock()

    time.Sleep(1 * time.Second)
    mu.Lock()
    ready = true
    fmt.Println("Signaling another goroutine")
    cond.Signal()
    mu.Unlock()
}

在这个示例中,启动了一个 goroutine,它在循环中等待 ready 条件为 true。主线程在 1 秒后将 ready 设置为 true,并调用 cond.Signal() 唤醒一个等待的 goroutine。每次唤醒后,又将 ready 设置为 false,模拟不同的条件变化。主线程再次等待 1 秒后,重复相同的操作,再次唤醒一个等待的 goroutine。

Broadcast 方法

Broadcast 方法用于唤醒等待队列中的所有 goroutine。所有被唤醒的 goroutine 都会尝试重新获取锁,获取到锁的 goroutine 会继续执行,而其他 goroutine 则会继续等待锁的释放。

示例代码如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    for i := 0; i < 3; i++ {
        go func(id int) {
            mu.Lock()
            for!ready {
                fmt.Printf("Goroutine %d waiting\n", id)
                cond.Wait()
            }
            fmt.Printf("Goroutine %d woke up\n", id)
            mu.Unlock()
        }(i)
    }

    time.Sleep(1 * time.Second)
    mu.Lock()
    ready = true
    fmt.Println("Broadcasting to all goroutines")
    cond.Broadcast()
    mu.Unlock()
}

在这个例子中,启动了 3 个 goroutine,它们都在等待 ready 条件为 true。主线程在 1 秒后将 ready 设置为 true,并调用 cond.Broadcast() 通知所有等待的 goroutine。所有被唤醒的 goroutine 都会重新获取锁并继续执行。

条件变量在生产者 - 消费者模型中的应用

生产者 - 消费者模型是并发编程中一个经典的模式,条件变量在这个模型中有着广泛的应用。

简单的生产者 - 消费者模型示例

package main

import (
    "fmt"
    "sync"
)

type Queue struct {
    data []int
    mu   sync.Mutex
    cond *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{}
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Enqueue(item int) {
    q.mu.Lock()
    q.data = append(q.data, item)
    fmt.Printf("Enqueued %d\n", item)
    q.cond.Broadcast()
    q.mu.Unlock()
}

func (q *Queue) Dequeue() int {
    q.mu.Lock()
    for len(q.data) == 0 {
        q.cond.Wait()
    }
    item := q.data[0]
    q.data = q.data[1:]
    fmt.Printf("Dequeued %d\n", item)
    q.mu.Unlock()
    return item
}

func main() {
    queue := NewQueue()

    go func() {
        for i := 1; i <= 5; i++ {
            queue.Enqueue(i)
        }
    }()

    go func() {
        for i := 0; i < 5; i++ {
            queue.Dequeue()
        }
    }()

    select {}
}

在上述代码中,定义了一个 Queue 结构体,其中包含一个整数切片 data 用于存储数据,一个互斥锁 mu 和一个条件变量 condEnqueue 方法用于向队列中添加元素,添加后通过 cond.Broadcast() 通知所有等待的消费者。Dequeue 方法在队列中没有元素时调用 cond.Wait() 等待生产者添加元素。当有元素时,从队列中取出元素并返回。

改进的生产者 - 消费者模型:带缓冲区限制

package main

import (
    "fmt"
    "sync"
)

type BoundedQueue struct {
    data  []int
    size  int
    mu    sync.Mutex
    cond  *sync.Cond
}

func NewBoundedQueue(size int) *BoundedQueue {
    q := &BoundedQueue{
        size: size,
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *BoundedQueue) Enqueue(item int) {
    q.mu.Lock()
    for len(q.data) == q.size {
        fmt.Println("Queue is full, waiting")
        q.cond.Wait()
    }
    q.data = append(q.data, item)
    fmt.Printf("Enqueued %d\n", item)
    q.cond.Broadcast()
    q.mu.Unlock()
}

func (q *BoundedQueue) Dequeue() int {
    q.mu.Lock()
    for len(q.data) == 0 {
        fmt.Println("Queue is empty, waiting")
        q.cond.Wait()
    }
    item := q.data[0]
    q.data = q.data[1:]
    fmt.Printf("Dequeued %d\n", item)
    q.cond.Broadcast()
    q.mu.Unlock()
    return item
}

func main() {
    queue := NewBoundedQueue(3)

    go func() {
        for i := 1; i <= 5; i++ {
            queue.Enqueue(i)
        }
    }()

    go func() {
        for i := 0; i < 5; i++ {
            queue.Dequeue()
        }
    }()

    select {}
}

这个改进版本的队列增加了缓冲区大小的限制。当队列满时,生产者 goroutine 会调用 cond.Wait() 等待消费者从队列中取出元素。当队列空时,消费者 goroutine 会调用 cond.Wait() 等待生产者添加元素。无论是生产者还是消费者在操作后都会通过 cond.Broadcast() 通知对方。

条件变量与 Channel 的比较

在 Go 语言中,Channel 也是一种常用的同步机制,它与条件变量在功能上有一些重叠,但也有各自的特点。

功能特点

  • Channel:更侧重于数据的传递,同时也可以用于同步。通过 Channel 发送和接收数据时,发送方和接收方会自动同步。Channel 本身就是线程安全的,不需要额外的锁。例如:
package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)

    go func() {
        ch <- 42
    }()

    value := <-ch
    fmt.Println("Received:", value)
}
  • 条件变量:主要用于协调多个 goroutine 之间的同步,侧重于在某个条件满足时通知等待的 goroutine。条件变量通常需要与互斥锁配合使用,以确保对共享数据的安全访问。

使用场景

  • Channel:适用于需要在 goroutine 之间传递数据的场景,例如简单的生产者 - 消费者模型可以直接使用 Channel 实现。
package main

import (
    "fmt"
)

func producer(ch chan<- int) {
    for i := 1; i <= 5; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for value := range ch {
        fmt.Printf("Consumed %d\n", value)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch)
    go consumer(ch)

    select {}
}
  • 条件变量:当需要更复杂的同步逻辑,例如在多个条件下进行等待和通知,或者需要在等待条件时对共享数据进行复杂操作时,条件变量更为合适。比如在一个需要根据不同条件进行不同操作的生产者 - 消费者模型中,条件变量可以更好地满足需求。

条件变量使用的常见问题与注意事项

忘记获取锁

在调用条件变量的 WaitSignalBroadcast 方法前,必须先获取相关的锁。否则,会导致未定义行为或竞态条件。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        // 这里忘记获取锁,会导致问题
        // 直接调用 cond.Signal()
        // 正确做法应该是先 mu.Lock()
        // 然后 cond.Signal()
        // 最后 mu.Unlock()
        ready = true
        cond.Signal()
    }()

    mu.Lock()
    for!ready {
        cond.Wait()
    }
    fmt.Println("Ready is true, continuing")
    mu.Unlock()
}

在上述错误示例中,后台 goroutine 在调用 cond.Signal() 前没有获取锁,这可能导致主线程中的 cond.Wait() 无法正确响应信号,引发竞态条件。

虚假唤醒问题

如前文所述,goroutine 可能会被虚假唤醒。因此,在调用 Wait 方法时,必须使用 for 循环来重新检查条件,确保条件真正满足。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        time.Sleep(2 * time.Second)
        mu.Lock()
        ready = true
        fmt.Println("Setting ready to true")
        cond.Broadcast()
        mu.Unlock()
    }()

    mu.Lock()
    // 错误做法,没有使用 for 循环检查条件
    // if!ready {
    //     cond.Wait()
    // }
    // 正确做法
    for!ready {
        fmt.Println("Waiting for ready to be true")
        cond.Wait()
    }
    fmt.Println("Ready is true, continuing")
    mu.Unlock()
}

在错误示例中,如果发生虚假唤醒,if 语句不会再次检查 ready 条件,可能导致程序逻辑错误。而使用 for 循环则可以避免这个问题。

锁的粒度控制

在使用条件变量时,需要合理控制锁的粒度。锁的持有时间过长可能会导致性能问题,而锁的粒度太小可能无法保证数据的一致性。

package main

import (
    "fmt"
    "sync"
    "time"
)

type SharedData struct {
    value int
    mu    sync.Mutex
    cond  *sync.Cond
}

func NewSharedData() *SharedData {
    sd := &SharedData{}
    sd.cond = sync.NewCond(&sd.mu)
    return sd
}

func (sd *SharedData) UpdateValue(newValue int) {
    sd.mu.Lock()
    sd.value = newValue
    fmt.Printf("Updated value to %d\n", newValue)
    sd.cond.Broadcast()
    // 错误做法,锁持有时间过长
    // time.Sleep(5 * time.Second)
    // 正确做法,尽快释放锁
    sd.mu.Unlock()
    // 如果有其他非共享数据操作,可以在此处进行
    time.Sleep(5 * time.Second)
}

func (sd *SharedData) ReadValue() int {
    sd.mu.Lock()
    for sd.value == 0 {
        sd.cond.Wait()
    }
    value := sd.value
    sd.mu.Unlock()
    return value
}

func main() {
    sharedData := NewSharedData()

    go func() {
        sharedData.UpdateValue(42)
    }()

    result := sharedData.ReadValue()
    fmt.Printf("Read value: %d\n", result)
}

在上述示例中,如果在 UpdateValue 方法中 Broadcast 之后长时间持有锁(如错误做法中的 time.Sleep(5 * time.Second)),会导致其他等待锁的 goroutine 长时间阻塞,降低程序性能。正确的做法是尽快释放锁,在锁外部进行非共享数据的操作。

总结

Go 语言的条件变量是一种强大的同步原语,它为 goroutine 之间的复杂同步提供了有效的解决方案。通过与互斥锁配合使用,条件变量可以实现诸如生产者 - 消费者模型等常见的并发模式。在使用条件变量时,需要注意获取锁、处理虚假唤醒以及合理控制锁的粒度等问题。同时,要根据具体的应用场景,合理选择条件变量或 Channel 等同步机制,以实现高效、健壮的并发程序。希望通过本文的介绍和示例,读者能够深入理解并熟练运用 Go 语言的条件变量进行并发编程。