MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go条件变量的多线程同步

2021-05-105.7k 阅读

Go 并发编程基础

在深入探讨 Go 条件变量的多线程同步之前,我们先来回顾一下 Go 并发编程的一些基础知识。

Go 语言在设计之初就将并发编程作为核心特性之一。其并发模型基于 goroutinechannelgoroutine 是一种轻量级的线程,由 Go 运行时(runtime)管理调度。与操作系统原生线程相比,goroutine 的创建和销毁开销极小,这使得我们可以轻松创建数以万计的 goroutine 来处理并发任务。

package main

import (
    "fmt"
)

func hello() {
    fmt.Println("Hello from goroutine")
}

func main() {
    go hello()
    fmt.Println("Main function")
}

在上述代码中,go hello() 语句创建了一个新的 goroutine 来执行 hello 函数。而主函数 main 本身也是一个 goroutine。在实际运行中,我们会发现 Main function 可能先于 Hello from goroutine 打印出来,这是因为 goroutine 的调度是不确定的,main 函数所在的 goroutine 可能在新创建的 goroutine 执行 hello 函数之前就继续执行并打印了相关内容。

channel 则是用于 goroutine 之间通信的管道。通过 channel,不同的 goroutine 可以安全地传递数据,从而实现数据共享和同步。例如:

package main

import (
    "fmt"
)

func sendData(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiveData(ch chan int) {
    for num := range ch {
        fmt.Println("Received:", num)
    }
}

func main() {
    ch := make(chan int)
    go sendData(ch)
    go receiveData(ch)

    select {}
}

在这段代码中,sendData 函数向 channel 中发送数据,receiveData 函数从 channel 中接收数据。for... range 结构会在 channel 关闭时自动结束循环。最后的 select {} 语句用于阻塞主 goroutine,防止程序过早退出。

多线程同步问题

虽然 goroutinechannel 为我们提供了强大的并发编程能力,但在实际应用中,我们仍然会遇到一些多线程同步问题。

竞争条件(Race Condition)

竞争条件是指当多个 goroutine 同时访问和修改共享资源时,由于执行顺序的不确定性,导致最终结果出现不可预测的情况。例如:

package main

import (
    "fmt"
)

var counter int

func increment() {
    counter++
}

func main() {
    for i := 0; i < 1000; i++ {
        go increment()
    }
    fmt.Println("Counter:", counter)
}

在理想情况下,counter 应该增加到 1000,但实际运行结果往往小于 1000。这是因为 counter++ 并不是一个原子操作,它包含读取 counter 的值、增加 1 以及写回新值这三个步骤。当多个 goroutine 同时执行这三个步骤时,就会出现竞争条件,导致部分增加操作丢失。

死锁(Deadlock)

死锁是指两个或多个 goroutine 相互等待对方释放资源,从而导致所有 goroutine 都无法继续执行的情况。例如:

package main

import (
    "fmt"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        ch1 <- 1
        <-ch2
    }()

    go func() {
        ch2 <- 2
        <-ch1
    }()
}

在这段代码中,两个 goroutine 分别向 ch1ch2 发送数据,然后等待从对方的 channel 接收数据。由于没有任何一个 goroutine 先进行接收操作,这就导致了死锁。Go 运行时会检测到这种死锁情况并报错。

条件变量(Condition Variable)概述

为了解决多线程同步问题,Go 语言提供了条件变量(sync.Cond)。条件变量是一种线程同步原语,它允许 goroutine 在满足特定条件时被唤醒。

条件变量通常与互斥锁(sync.Mutex)一起使用。互斥锁用于保护共享资源,确保同一时间只有一个 goroutine 可以访问。而条件变量则用于在共享资源状态发生变化时通知等待的 goroutine

sync.Cond 结构体

sync.Cond 结构体定义在 sync 包中,其定义如下:

type Cond struct {
    noCopy noCopy

    L Locker

    notify  notifyList
    checker copyChecker
}
  • L:一个实现了 Locker 接口的对象,通常是 sync.Mutexsync.RWMutex。它用于保护共享资源,在调用 Cond 的方法之前,必须先获取该锁。
  • notifyList:用于管理等待该条件变量的 goroutine 列表。
  • noCopycopyChecker:用于防止 Cond 对象被复制,以避免出现未定义行为。

sync.Cond 的方法

NewCond 函数

NewCond 函数用于创建一个新的 Cond 对象,其定义如下:

func NewCond(l Locker) *Cond

例如:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
}

在上述代码中,我们创建了一个 sync.Mutex,并将其传递给 NewCond 函数来创建一个 Cond 对象。

Wait 方法

Wait 方法用于等待条件变量被通知。在调用 Wait 方法之前,必须先获取与 Cond 关联的锁。Wait 方法会自动释放锁,并将当前 goroutine 加入等待队列。当 Wait 方法被唤醒时,它会重新获取锁并继续执行。其定义如下:

func (c *Cond) Wait()

示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false

    go func() {
        time.Sleep(2 * time.Second)
        mu.Lock()
        ready = true
        fmt.Println("Setting ready to true")
        cond.Broadcast()
        mu.Unlock()
    }()

    mu.Lock()
    for!ready {
        fmt.Println("Waiting for ready to be true")
        cond.Wait()
    }
    fmt.Println("Ready is true, continuing")
    mu.Unlock()
}

在这段代码中,主 goroutine 等待 ready 变量变为 true。另一个 goroutine 在 2 秒后将 ready 设置为 true 并调用 Broadcast 方法通知所有等待的 goroutine。主 goroutine 在被唤醒后检查 ready 变量,确保条件满足后继续执行。

Signal 方法

Signal 方法用于唤醒一个等待条件变量的 goroutine。如果有多个 goroutine 在等待,具体唤醒哪个 goroutine 是不确定的。其定义如下:

func (c *Cond) Signal()

示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, cond *sync.Cond, mu *sync.Mutex) {
    mu.Lock()
    fmt.Printf("Worker %d waiting\n", id)
    cond.Wait()
    fmt.Printf("Worker %d awakened\n", id)
    mu.Unlock()
}

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)

    for i := 0; i < 3; i++ {
        go worker(i, cond, &mu)
    }

    time.Sleep(2 * time.Second)
    mu.Lock()
    fmt.Println("Signaling one worker")
    cond.Signal()
    mu.Unlock()

    time.Sleep(2 * time.Second)
    mu.Lock()
    fmt.Println("Signaling another worker")
    cond.Signal()
    mu.Unlock()
}

在上述代码中,我们创建了三个 goroutine 作为工作线程,它们都在等待条件变量。主线程在不同时间调用 Signal 方法,每次唤醒一个等待的工作线程。

Broadcast 方法

Broadcast 方法用于唤醒所有等待条件变量的 goroutine。其定义如下:

func (c *Cond) Broadcast()

示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, cond *sync.Cond, mu *sync.Mutex) {
    mu.Lock()
    fmt.Printf("Worker %d waiting\n", id)
    cond.Wait()
    fmt.Printf("Worker %d awakened\n", id)
    mu.Unlock()
}

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)

    for i := 0; i < 3; i++ {
        go worker(i, cond, &mu)
    }

    time.Sleep(2 * time.Second)
    mu.Lock()
    fmt.Println("Broadcasting to all workers")
    cond.Broadcast()
    mu.Unlock()
}

在这段代码中,我们创建了三个工作线程,主线程在 2 秒后调用 Broadcast 方法,唤醒所有等待的工作线程。

使用条件变量解决实际问题

生产者 - 消费者模型

生产者 - 消费者模型是一个经典的多线程同步问题。在这个模型中,生产者 goroutine 生成数据并将其放入缓冲区,消费者 goroutine 从缓冲区中取出数据进行处理。

package main

import (
    "fmt"
    "sync"
    "time"
)

const bufferSize = 5

type Buffer struct {
    data  [bufferSize]int
    count int
    front int
    rear  int
    mu    sync.Mutex
    cond  *sync.Cond
}

func (b *Buffer) Produce(item int) {
    b.mu.Lock()
    for b.count == bufferSize {
        fmt.Println("Buffer is full, producer waiting")
        b.cond.Wait()
    }
    b.data[b.rear] = item
    b.rear = (b.rear + 1) % bufferSize
    b.count++
    fmt.Printf("Produced: %d\n", item)
    b.cond.Signal()
    b.mu.Unlock()
}

func (b *Buffer) Consume() int {
    b.mu.Lock()
    for b.count == 0 {
        fmt.Println("Buffer is empty, consumer waiting")
        b.cond.Wait()
    }
    item := b.data[b.front]
    b.front = (b.front + 1) % bufferSize
    b.count--
    fmt.Printf("Consumed: %d\n", item)
    b.cond.Signal()
    b.mu.Unlock()
    return item
}

func main() {
    buffer := &Buffer{cond: sync.NewCond(&buffer.mu)}

    go func() {
        for i := 0; i < 10; i++ {
            buffer.Produce(i)
            time.Sleep(time.Millisecond * 500)
        }
    }()

    go func() {
        for i := 0; i < 10; i++ {
            buffer.Consume()
            time.Sleep(time.Millisecond * 1000)
        }
    }()

    select {}
}

在上述代码中,Buffer 结构体表示缓冲区,Produce 方法用于生产数据,Consume 方法用于消费数据。当缓冲区满时,生产者 goroutine 等待;当缓冲区空时,消费者 goroutine 等待。通过条件变量和互斥锁,我们实现了生产者和消费者之间的同步。

读写锁与条件变量结合

在一些场景中,我们需要允许多个 goroutine 同时读取共享资源,但只允许一个 goroutine 进行写入操作。这时候可以使用读写锁(sync.RWMutex)与条件变量结合来实现更高效的同步。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Data struct {
    value int
    mu    sync.RWMutex
    cond  *sync.Cond
}

func (d *Data) Read() int {
    d.mu.RLock()
    defer d.mu.RUnlock()
    fmt.Println("Reading value:", d.value)
    return d.value
}

func (d *Data) Write(newValue int) {
    d.mu.Lock()
    for d.value != 0 {
        fmt.Println("Value is not zero, writer waiting")
        d.cond.Wait()
    }
    d.value = newValue
    fmt.Println("Written value:", d.value)
    d.cond.Broadcast()
    d.mu.Unlock()
}

func main() {
    data := &Data{cond: sync.NewCond(&data.mu)}

    go func() {
        for i := 0; i < 3; i++ {
            data.Read()
            time.Sleep(time.Millisecond * 500)
        }
    }()

    go func() {
        for i := 0; i < 3; i++ {
            data.Write((i + 1) * 10)
            time.Sleep(time.Millisecond * 1000)
        }
    }()

    select {}
}

在这段代码中,Data 结构体包含一个共享值、一个读写锁和一个条件变量。Read 方法使用读锁允许多个 goroutine 同时读取,Write 方法使用写锁确保只有一个 goroutine 可以写入。当要写入的值不为零时,写入 goroutine 会等待,直到值为零。写入完成后,通过条件变量通知所有等待的 goroutine

条件变量使用注意事项

  1. 锁的使用:在调用 Cond 的方法(WaitSignalBroadcast)之前,必须先获取与 Cond 关联的锁。并且在 Wait 方法返回后,锁会被重新获取,所以在处理共享资源时要注意锁的持有和释放。
  2. 避免死锁:确保在通知 goroutine 之前,共享资源的状态已经被正确修改,并且等待的 goroutine 有机会获取锁并处理共享资源。例如,在使用 SignalBroadcast 后,应该尽快释放锁,以便等待的 goroutine 能够获取锁并继续执行。
  3. 虚假唤醒:虽然 Go 的 sync.Cond 实现中不太可能出现虚假唤醒,但为了代码的健壮性,在 Wait 方法返回后,应该再次检查条件是否满足。通常使用 for 循环来等待条件满足,如 for!condition { cond.Wait() }

性能考虑

  1. 通知频率:频繁调用 SignalBroadcast 方法可能会带来一定的性能开销,尤其是在有大量等待 goroutine 的情况下。因此,应该尽量减少不必要的通知,只有在共享资源状态发生真正影响等待 goroutine 的变化时才进行通知。
  2. 锁的粒度:与 Cond 关联的锁的粒度会影响程序的性能。如果锁的粒度太大,会导致并发度降低;如果锁的粒度太小,可能会增加锁的竞争和管理开销。需要根据具体业务场景来合理设置锁的粒度。

通过深入理解 Go 条件变量的原理和使用方法,并结合实际场景进行优化,我们可以有效地解决多线程同步问题,编写出高效、健壮的并发程序。在实际开发中,需要不断实践和总结经验,以充分发挥 Go 语言并发编程的优势。