MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go 语言条件变量的使用与线程同步

2024-03-183.2k 阅读

Go 语言中的并发编程基础

在深入探讨 Go 语言条件变量的使用与线程同步之前,我们先来回顾一下 Go 语言并发编程的基础概念。

Goroutine

Goroutine 是 Go 语言中实现并发的核心机制。它类似于线程,但更轻量级。与传统线程相比,创建和销毁 Goroutine 的开销非常小。一个程序可以轻松创建数以万计的 Goroutine。

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

在上述代码中,go say("world") 启动了一个新的 Goroutine 来执行 say 函数,而 say("hello") 则在主 Goroutine 中执行。这两个函数并发执行,最终输出 helloworld 交替出现的结果。

通道(Channel)

通道是 Goroutine 之间进行通信的机制。它提供了一种类型安全的方式来传递数据。通道分为有缓冲通道和无缓冲通道。

无缓冲通道

无缓冲通道在发送和接收操作时会阻塞,直到对应的接收或发送操作准备好。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go func() {
        ch <- 42
    }()
    value := <-ch
    fmt.Println(value)
}

在这段代码中,匿名 Goroutine 向通道 ch 发送值 42,主 Goroutine 从通道 ch 接收这个值。如果没有主 Goroutine 的接收操作,发送操作会一直阻塞。

有缓冲通道

有缓冲通道在缓冲区未满时,发送操作不会阻塞;在缓冲区不为空时,接收操作不会阻塞。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 2)
    ch <- 10
    ch <- 20
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

这里创建了一个容量为 2 的有缓冲通道 ch。可以连续发送两个值而不会阻塞,然后依次接收这两个值。

线程同步的需求

在并发编程中,多个 Goroutine 可能会访问共享资源。如果不对这些访问进行适当的同步控制,就会导致数据竞争和不一致的问题。

数据竞争的例子

package main

import (
    "fmt"
    "sync"
)

var counter int

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        counter++
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在上述代码中,10 个 Goroutine 同时对 counter 进行递增操作。由于没有同步机制,不同 Goroutine 的操作可能会相互干扰,导致最终的 counter 值并非预期的 10000。

互斥锁(Mutex)

互斥锁是一种常用的同步工具,用于保护共享资源,确保同一时间只有一个 Goroutine 可以访问该资源。

package main

import (
    "fmt"
    "sync"
)

var counter int
var mu sync.Mutex

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        mu.Lock()
        counter++
        mu.Unlock()
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在这段代码中,通过 mu.Lock()mu.Unlock() 来保护对 counter 的访问。在任何时刻,只有获取到锁的 Goroutine 可以修改 counter,从而避免了数据竞争。

条件变量(Cond)

虽然互斥锁能解决基本的同步问题,但在一些场景下,我们需要更复杂的同步机制,这就是条件变量发挥作用的地方。

条件变量的概念

条件变量是一种允许 Goroutine 等待特定条件满足的同步原语。它通常与互斥锁一起使用。当一个 Goroutine 等待某个条件时,它会释放持有的互斥锁并进入睡眠状态,直到其他 Goroutine 通知它条件已满足。

创建条件变量

在 Go 语言中,可以通过 sync.NewCond 函数来创建条件变量。

package main

import (
    "fmt"
    "sync"
)

func main() {
    mu := sync.Mutex{}
    cond := sync.NewCond(&mu)
    // 后续使用cond进行同步操作
}

这里先创建了一个互斥锁 mu,然后基于这个互斥锁创建了条件变量 cond

等待条件(Wait 方法)

Goroutine 通过调用条件变量的 Wait 方法来等待条件满足。Wait 方法会自动释放关联的互斥锁,并将 Goroutine 阻塞。当 Wait 方法返回时,它会重新获取互斥锁。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    mu := sync.Mutex{}
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        time.Sleep(2 * time.Second)
        mu.Lock()
        ready = true
        fmt.Println("Setting ready to true")
        cond.Broadcast()
        mu.Unlock()
    }()

    mu.Lock()
    for!ready {
        fmt.Println("Waiting for ready to be true")
        cond.Wait()
    }
    fmt.Println("Ready is true, continuing")
    mu.Unlock()
}

在上述代码中,有一个后台 Goroutine 会在 2 秒后将 ready 设置为 true 并调用 cond.Broadcast() 通知所有等待的 Goroutine。主 Goroutine 在获取互斥锁后,通过 for!ready 循环检查条件,调用 cond.Wait() 等待条件满足。当 ready 变为 true 时,主 Goroutine 会被唤醒并继续执行。

通知条件(Signal 和 Broadcast 方法)

  • Signal 方法:唤醒一个等待在条件变量上的 Goroutine。如果有多个 Goroutine 在等待,只会唤醒其中一个。
  • Broadcast 方法:唤醒所有等待在条件变量上的 Goroutine。
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, cond *sync.Cond) {
    cond.L.Lock()
    fmt.Printf("Worker %d waiting\n", id)
    cond.Wait()
    fmt.Printf("Worker %d woke up\n", id)
    cond.L.Unlock()
}

func main() {
    mu := sync.Mutex{}
    cond := sync.NewCond(&mu)

    for i := 1; i <= 3; i++ {
        go worker(i, cond)
    }

    time.Sleep(2 * time.Second)
    mu.Lock()
    fmt.Println("Broadcasting to all workers")
    cond.Broadcast()
    mu.Unlock()

    time.Sleep(2 * time.Second)
    mu.Lock()
    fmt.Println("Signaling to one worker")
    cond.Signal()
    mu.Unlock()

    time.Sleep(2 * time.Second)
}

在这个例子中,创建了 3 个工作 Goroutine 等待在条件变量上。2 秒后,先调用 Broadcast 唤醒所有等待的 Goroutine,再过 2 秒,调用 Signal 唤醒一个等待的 Goroutine。

条件变量在生产者 - 消费者模型中的应用

生产者 - 消费者模型是并发编程中常见的模式,条件变量在其中起着关键作用。

简单的生产者 - 消费者模型

package main

import (
    "fmt"
    "sync"
)

type Queue struct {
    data []int
    size int
    mu   sync.Mutex
    cond *sync.Cond
}

func NewQueue(size int) *Queue {
    q := &Queue{
        data: make([]int, 0, size),
        size: size,
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Enqueue(item int) {
    q.mu.Lock()
    for len(q.data) == q.size {
        q.cond.Wait()
    }
    q.data = append(q.data, item)
    fmt.Printf("Enqueued %d\n", item)
    q.cond.Signal()
    q.mu.Unlock()
}

func (q *Queue) Dequeue() int {
    q.mu.Lock()
    for len(q.data) == 0 {
        q.cond.Wait()
    }
    item := q.data[0]
    q.data = q.data[1:]
    fmt.Printf("Dequeued %d\n", item)
    q.cond.Signal()
    q.mu.Unlock()
    return item
}

func main() {
    queue := NewQueue(3)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        for i := 1; i <= 5; i++ {
            queue.Enqueue(i)
        }
    }()

    go func() {
        defer wg.Done()
        for i := 1; i <= 5; i++ {
            queue.Dequeue()
        }
    }()

    wg.Wait()
}

在这个生产者 - 消费者模型中,Queue 结构体表示一个队列,包含数据、队列大小、互斥锁和条件变量。Enqueue 方法用于向队列中添加元素,当队列满时,生产者 Goroutine 会等待;Dequeue 方法用于从队列中取出元素,当队列为空时,消费者 Goroutine 会等待。通过条件变量的 WaitSignal 方法实现了生产者和消费者之间的同步。

改进的生产者 - 消费者模型(带缓冲区)

package main

import (
    "fmt"
    "sync"
    "time"
)

type BufferedQueue struct {
    data     []int
    capacity int
    count    int
    head     int
    tail     int
    mu       sync.Mutex
    notFull  *sync.Cond
    notEmpty *sync.Cond
}

func NewBufferedQueue(capacity int) *BufferedQueue {
    q := &BufferedQueue{
        data:     make([]int, capacity),
        capacity: capacity,
    }
    q.notFull = sync.NewCond(&q.mu)
    q.notEmpty = sync.NewCond(&q.mu)
    return q
}

func (q *BufferedQueue) Enqueue(item int) {
    q.mu.Lock()
    for q.count == q.capacity {
        q.notFull.Wait()
    }
    q.data[q.tail] = item
    q.tail = (q.tail + 1) % q.capacity
    q.count++
    fmt.Printf("Enqueued %d\n", item)
    q.notEmpty.Signal()
    q.mu.Unlock()
}

func (q *BufferedQueue) Dequeue() int {
    q.mu.Lock()
    for q.count == 0 {
        q.notEmpty.Wait()
    }
    item := q.data[q.head]
    q.head = (q.head + 1) % q.capacity
    q.count--
    fmt.Printf("Dequeued %d\n", item)
    q.notFull.Signal()
    q.mu.Unlock()
    return item
}

func main() {
    queue := NewBufferedQueue(3)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        for i := 1; i <= 5; i++ {
            queue.Enqueue(i)
            time.Sleep(100 * time.Millisecond)
        }
    }()

    go func() {
        defer wg.Done()
        for i := 1; i <= 5; i++ {
            queue.Dequeue()
            time.Sleep(200 * time.Millisecond)
        }
    }()

    wg.Wait()
}

这个改进版本的生产者 - 消费者模型使用环形缓冲区来提高效率。notFull 条件变量用于通知生产者队列有空间可用,notEmpty 条件变量用于通知消费者队列有数据可消费。通过这种方式,进一步优化了生产者和消费者之间的同步与协作。

条件变量与其他同步原语的对比

与互斥锁的对比

  • 功能侧重点:互斥锁主要用于保护共享资源,防止多个 Goroutine 同时访问导致数据竞争。而条件变量更侧重于协调 Goroutine 之间的行为,让 Goroutine 等待特定条件满足。
  • 使用方式:互斥锁通常在访问共享资源前后直接加锁和解锁。条件变量需要与互斥锁配合使用,在等待条件时释放互斥锁,被唤醒后重新获取互斥锁。

与通道的对比

  • 通信方式:通道主要用于 Goroutine 之间的数据传递,通过发送和接收操作来同步。条件变量则是基于事件通知的机制,用于等待某个条件满足。
  • 适用场景:如果需要在 Goroutine 之间传递数据并同步,通道是较好的选择。如果只是需要等待某个条件,条件变量更合适。例如,在生产者 - 消费者模型中,通道可用于传递数据,而条件变量可用于控制生产和消费的节奏。

条件变量使用中的常见问题与注意事项

虚假唤醒

在调用 Wait 方法时,可能会出现虚假唤醒的情况,即没有其他 Goroutine 调用 SignalBroadcastWait 方法也会返回。为了避免这种情况,应该在循环中检查条件,而不是只检查一次。

// 正确的方式
mu.Lock()
for!ready {
    cond.Wait()
}
mu.Unlock()

// 错误的方式
mu.Lock()
if!ready {
    cond.Wait()
}
mu.Unlock()

通过使用 for 循环,即使出现虚假唤醒,也能确保只有在条件真正满足时才继续执行。

死锁问题

在使用条件变量时,如果没有正确处理互斥锁的获取和释放,可能会导致死锁。例如,在调用 SignalBroadcast 时没有持有互斥锁,或者在等待条件时没有释放互斥锁。

// 错误示例,可能导致死锁
mu := sync.Mutex{}
cond := sync.NewCond(&mu)

go func() {
    mu.Lock()
    // 这里没有释放互斥锁就调用Wait,可能导致死锁
    cond.Wait()
    mu.Unlock()
}()

// 主Goroutine
mu.Lock()
// 这里没有获取互斥锁就调用Broadcast,可能导致死锁
cond.Broadcast()
mu.Unlock()

正确的做法是在调用 Wait 之前获取互斥锁,并在 Wait 内部释放和重新获取;在调用 SignalBroadcast 时持有互斥锁。

条件变量的作用域

条件变量应该与它所保护的共享资源具有相同的作用域。如果条件变量的生命周期结束,而还有 Goroutine 在等待它,会导致未定义行为。

func someFunction() {
    mu := sync.Mutex{}
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        mu.Lock()
        for!ready {
            cond.Wait()
        }
        mu.Unlock()
    }()

    // 这里不应该提前销毁cond和mu,否则等待的Goroutine会有问题
    // 确保在所有相关Goroutine完成后再释放资源
}

要确保在所有依赖条件变量的 Goroutine 完成工作后,再释放相关的资源。

复杂场景下条件变量的应用

多阶段任务同步

假设我们有一个复杂的任务,分为多个阶段,每个阶段由不同的 Goroutine 执行,并且下一阶段需要等待上一阶段完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

type TaskStage struct {
    mu    sync.Mutex
    cond  *sync.Cond
    ready bool
}

func NewTaskStage() *TaskStage {
    ts := &TaskStage{}
    ts.cond = sync.NewCond(&ts.mu)
    return ts
}

func (ts *TaskStage) WaitForStage() {
    ts.mu.Lock()
    for!ts.ready {
        ts.cond.Wait()
    }
    ts.mu.Unlock()
}

func (ts *TaskStage) MarkStageComplete() {
    ts.mu.Lock()
    ts.ready = true
    ts.cond.Broadcast()
    ts.mu.Unlock()
}

func main() {
    stage1 := NewTaskStage()
    stage2 := NewTaskStage()

    var wg sync.WaitGroup
    wg.Add(3)

    go func() {
        defer wg.Done()
        fmt.Println("Starting stage 1")
        time.Sleep(2 * time.Second)
        fmt.Println("Completing stage 1")
        stage1.MarkStageComplete()
    }()

    go func() {
        defer wg.Done()
        stage1.WaitForStage()
        fmt.Println("Starting stage 2")
        time.Sleep(2 * time.Second)
        fmt.Println("Completing stage 2")
        stage2.MarkStageComplete()
    }()

    go func() {
        defer wg.Done()
        stage2.WaitForStage()
        fmt.Println("Starting final stage")
        time.Sleep(2 * time.Second)
        fmt.Println("Final stage completed")
    }()

    wg.Wait()
}

在这个例子中,通过条件变量实现了任务不同阶段之间的同步。每个阶段完成后,通过 MarkStageComplete 方法通知下一个阶段可以开始。

资源池管理

资源池是一种常见的设计模式,用于管理有限的资源。条件变量可以用于控制资源的获取和释放。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Resource struct {
    id int
}

type ResourcePool struct {
    resources []*Resource
    available chan struct{}
    mu        sync.Mutex
    cond      *sync.Cond
}

func NewResourcePool(size int) *ResourcePool {
    rp := &ResourcePool{
        resources: make([]*Resource, size),
        available: make(chan struct{}, size),
    }
    for i := 0; i < size; i++ {
        rp.resources[i] = &Resource{id: i}
        rp.available <- struct{}{}
    }
    rp.cond = sync.NewCond(&rp.mu)
    return rp
}

func (rp *ResourcePool) GetResource() *Resource {
    <-rp.available
    rp.mu.Lock()
    defer rp.mu.Unlock()
    for len(rp.resources) == 0 {
        rp.cond.Wait()
    }
    resource := rp.resources[0]
    rp.resources = rp.resources[1:]
    return resource
}

func (rp *ResourcePool) ReturnResource(resource *Resource) {
    rp.mu.Lock()
    rp.resources = append(rp.resources, resource)
    rp.cond.Signal()
    rp.mu.Unlock()
    rp.available <- struct{}{}
}

func main() {
    pool := NewResourcePool(3)

    var wg sync.WaitGroup
    wg.Add(5)

    for i := 0; i < 5; i++ {
        go func(id int) {
            defer wg.Done()
            resource := pool.GetResource()
            fmt.Printf("Goroutine %d got resource %d\n", id, resource.id)
            time.Sleep(1 * time.Second)
            fmt.Printf("Goroutine %d returning resource %d\n", id, resource.id)
            pool.ReturnResource(resource)
        }(i)
    }

    wg.Wait()
}

在这个资源池管理的例子中,ResourcePool 结构体表示资源池,通过条件变量和通道来控制资源的获取和释放。当资源池为空时,获取资源的 Goroutine 会等待;当有资源被归还时,等待的 Goroutine 会被唤醒。

总结条件变量的使用要点

  1. 与互斥锁配合:条件变量必须与互斥锁一起使用,在等待条件时释放互斥锁,被唤醒后重新获取互斥锁。
  2. 循环检查条件:为了避免虚假唤醒,应该在循环中检查条件,而不是只检查一次。
  3. 正确调用通知方法:根据实际需求选择 SignalBroadcast 方法,并且在调用时要持有互斥锁。
  4. 注意作用域:确保条件变量的作用域与它所保护的共享资源一致,避免提前销毁导致未定义行为。

通过合理使用条件变量,Go 语言开发者可以解决复杂的线程同步问题,实现高效、安全的并发程序。无论是简单的生产者 - 消费者模型,还是复杂的多阶段任务同步和资源池管理,条件变量都能发挥重要作用。在实际应用中,要深入理解其原理,并注意避免常见的问题,以充分发挥条件变量在并发编程中的优势。