MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go了解协程的工作原理

2023-02-093.9k 阅读

Go 协程基础概念

什么是协程

在 Go 语言中,协程(goroutine)是一种轻量级的线程执行单元。与操作系统线程不同,协程由 Go 运行时(runtime)管理,而不是操作系统内核。这使得创建和销毁协程的开销极小,能够轻松创建成千上万的协程,用于并发编程。

协程与线程的区别

  1. 资源开销
    • 线程:操作系统线程通常需要较大的栈空间(一般为几 MB),创建和销毁线程的系统开销较大,因为涉及操作系统内核的调度和资源分配。
    • 协程:Go 协程的栈空间初始时非常小(通常为 2KB 左右),并且可以根据需要动态增长和收缩。创建和销毁协程几乎没有什么开销,由 Go 运行时在用户态进行管理,无需内核参与。
  2. 调度方式
    • 线程:操作系统采用抢占式调度,内核决定哪个线程何时运行以及运行多长时间。这种调度方式可能导致线程在执行关键操作时被突然中断,需要复杂的同步机制来保证数据一致性。
    • 协程:Go 协程采用协作式调度,协程会主动让出执行权。当一个协程执行如 I/O 操作、系统调用、调用 runtime.Gosched() 函数等操作时,它会暂停执行,让其他协程有机会运行。

简单的协程示例

下面通过一个简单的代码示例来展示如何在 Go 中创建和运行协程:

package main

import (
    "fmt"
    "time"
)

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Printf("Number: %d\n", i)
        time.Sleep(100 * time.Millisecond)
    }
}

func printLetters() {
    for i := 'a'; i <= 'e'; i++ {
        fmt.Printf("Letter: %c\n", i)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    go printNumbers()
    go printLetters()

    time.Sleep(1000 * time.Millisecond)
}

在上述代码中,main 函数中通过 go 关键字启动了两个协程,分别执行 printNumbersprintLetters 函数。time.Sleep 函数用于模拟一些工作,防止主线程过早退出。这两个协程并发执行,交替输出数字和字母。

Go 协程调度器

调度器的组成

Go 协程调度器主要由三个部分组成:M:N 调度模型、Goroutine 队列和调度器的运行机制。

  1. M:N 调度模型
    • Go 语言采用 M:N 调度模型,即 N 个协程映射到 M 个操作系统线程上(N >> M)。这种模型结合了 1:1 模型(每个用户线程映射到一个内核线程)和 N:1 模型(多个用户线程映射到一个内核线程)的优点。它既能利用多核 CPU 的并行性,又能通过协程的轻量级特性高效地管理大量并发任务。
    • 在 Go 调度器中,M 代表操作系统线程(Machine),N 代表协程(Goroutine)。多个协程可以复用少量的操作系统线程,减少了线程创建和上下文切换的开销。
  2. Goroutine 队列
    • 全局队列:Go 运行时维护一个全局的协程队列。当一个新的协程被创建时,它有可能被放入全局队列中。当一个 M 线程在本地队列中找不到可运行的协程时,它会尝试从全局队列中获取协程来执行。
    • 本地队列:每个 M 线程都有一个本地的协程队列。当一个协程被创建时,优先被放入创建它的 M 线程的本地队列中。这样,M 线程可以快速地从本地队列中获取协程执行,减少了锁的竞争。当本地队列满了或者需要平衡负载时,协程会被转移到全局队列。
  3. 调度器的运行机制
    • 创建协程:当通过 go 关键字创建一个新的协程时,Go 运行时会为其分配一个唯一的标识符(GID),并初始化协程的栈空间等资源。然后,协程会被放入创建它的 M 线程的本地队列或者全局队列。
    • 调度协程:每个 M 线程会不断地从本地队列中取出协程并执行。当本地队列为空时,M 线程会尝试从全局队列中获取协程。如果全局队列也为空,M 线程会尝试从其他 M 线程的本地队列中偷取一部分协程(工作窃取算法)。
    • 协程暂停与恢复:当一个协程执行到如 I/O 操作、系统调用等阻塞操作时,它会暂停执行,并将自己从 M 线程的执行队列中移除。然后,M 线程会从队列中取出其他协程继续执行。当阻塞操作完成后,暂停的协程会被重新放入队列,等待再次被调度执行。

调度器的关键数据结构

  1. Goroutine 结构体
    type g struct {
        stack       stack   // 协程栈
        stackguard0 uintptr // 栈保护边界
        stackguard1 uintptr // 栈保护边界(不同架构下使用)
        _panic      *_panic // 当前协程的 panic 链表
        _defer      *_defer // 当前协程的 defer 链表
        m           *m      // 绑定的 M 线程
        sched       gobuf   // 保存协程的上下文
        goid        int64   // 协程 ID
        // 其他字段...
    }
    
    • stack 字段用于存储协程的栈空间。
    • sched 字段是一个 gobuf 类型,保存了协程的上下文信息,包括程序计数器(PC)、栈指针(SP)等,使得协程可以暂停和恢复执行。
  2. M 结构体
    type m struct {
        g0      *g      // 用于调度的特殊协程
        curg    *g      // 当前正在执行的协程
        p       puintptr// 关联的 P
        // 其他字段...
    }
    
    • g0 是一个特殊的协程,用于执行调度相关的代码,如处理系统调用、调度其他协程等。
    • curg 指向当前正在执行的协程。
  3. P 结构体
    type p struct {
        id          int32
        status      uint32
        runqhead    uint32
        runqtail    uint32
        runq        [256]guintptr
        // 其他字段...
    }
    
    • P(Processor)代表处理器资源,它管理着一个本地的协程队列(runq)。每个 M 线程需要绑定一个 P 才能运行协程。P 的数量决定了同一时刻可以并行执行的协程数量,默认情况下,P 的数量等于 CPU 的核心数,可以通过 runtime.GOMAXPROCS 函数进行调整。

协程的并发控制

互斥锁(Mutex)

在并发编程中,多个协程可能会同时访问共享资源,这可能导致数据竞争和不一致问题。互斥锁(Mutex)是一种常用的同步机制,用于保证在同一时刻只有一个协程可以访问共享资源。

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter)
}

在上述代码中,mu 是一个 sync.Mutex 实例。在 increment 函数中,通过 mu.Lock() 锁定互斥锁,保证只有一个协程可以执行 counter++ 操作,操作完成后通过 mu.Unlock() 解锁互斥锁。

读写锁(RWMutex)

当共享资源的读操作远远多于写操作时,使用读写锁(RWMutex)可以提高并发性能。读写锁允许多个协程同时进行读操作,但在写操作时会独占资源,防止其他读写操作。

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data    int
    rwMutex sync.RWMutex
)

func read(wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.RLock()
    fmt.Printf("Read data: %d\n", data)
    rwMutex.RUnlock()
}

func write(wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.Lock()
    data++
    fmt.Printf("Write data: %d\n", data)
    rwMutex.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go read(&wg)
    }
    time.Sleep(100 * time.Millisecond)
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go write(&wg)
    }
    wg.Wait()
}

在上述代码中,rwMutex 是一个 sync.RWMutex 实例。read 函数使用 rwMutex.RLock() 进行读锁定,允许多个协程同时读。write 函数使用 rwMutex.Lock() 进行写锁定,独占资源进行写操作。

条件变量(Cond)

条件变量(Cond)用于在某些条件满足时通知等待的协程。它通常与互斥锁一起使用。

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    mu      sync.Mutex
    cond    *sync.Cond
    counter int
)

func waiter(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    for counter < 10 {
        fmt.Println("Waiting...")
        cond.Wait()
    }
    fmt.Printf("Condition met: counter = %d\n", counter)
    mu.Unlock()
}

func incrementer(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 15; i++ {
        mu.Lock()
        counter++
        fmt.Printf("Incremented counter: %d\n", counter)
        if counter >= 10 {
            cond.Broadcast()
        }
        mu.Unlock()
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    var wg sync.WaitGroup
    cond = sync.NewCond(&mu)
    wg.Add(2)
    go waiter(&wg)
    go incrementer(&wg)
    wg.Wait()
}

在上述代码中,cond 是一个 sync.Cond 实例,通过 sync.NewCond 基于 mu 互斥锁创建。waiter 协程在 counter 小于 10 时通过 cond.Wait() 等待,incrementer 协程在 counter 大于等于 10 时通过 cond.Broadcast() 通知所有等待的协程。

协程与通道(Channel)

通道的基本概念

通道(Channel)是 Go 语言中用于协程间通信的重要机制。它可以在不同的协程之间传递数据,并且自带同步功能,有助于避免数据竞争问题。

  1. 通道的创建
    ch := make(chan int)
    
    上述代码创建了一个类型为 int 的无缓冲通道。也可以创建有缓冲通道:
    ch := make(chan int, 10)
    
    这里创建了一个容量为 10 的有缓冲通道。
  2. 通道的发送与接收
    ch <- 10 // 发送数据到通道
    value := <-ch // 从通道接收数据
    
    发送和接收操作都是阻塞的。在无缓冲通道中,发送操作会阻塞直到有其他协程从通道接收数据,接收操作会阻塞直到有其他协程向通道发送数据。在有缓冲通道中,只有当通道满了时发送操作才会阻塞,通道空了时接收操作才会阻塞。

通道与协程的同步

通道可以用于协程之间的同步。例如,通过通道来等待所有协程完成任务。

package main

import (
    "fmt"
)

func worker(id int, ch chan bool) {
    fmt.Printf("Worker %d started\n", id)
    // 模拟工作
    for i := 0; i < 1000000; i++ {
        _ = i * i
    }
    fmt.Printf("Worker %d finished\n", id)
    ch <- true
}

func main() {
    const numWorkers = 5
    ch := make(chan bool, numWorkers)
    for i := 1; i <= numWorkers; i++ {
        go worker(i, ch)
    }
    for i := 0; i < numWorkers; i++ {
        <-ch
    }
    close(ch)
    fmt.Println("All workers completed")
}

在上述代码中,每个 worker 协程完成工作后向通道 ch 发送一个 truemain 协程通过从通道接收 numWorkers 次数据来等待所有 worker 协程完成任务。

通道的多路复用(Select)

select 语句用于在多个通道操作(发送或接收)之间进行选择。当有多个通道操作可以进行时,select 会随机选择一个执行。如果没有通道操作可以进行,select 会阻塞,直到有通道操作可以进行。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        time.Sleep(2 * time.Second)
        ch1 <- 10
    }()

    go func() {
        time.Sleep(1 * time.Second)
        ch2 <- 20
    }()

    select {
    case value := <-ch1:
        fmt.Printf("Received from ch1: %d\n", value)
    case value := <-ch2:
        fmt.Printf("Received from ch2: %d\n", value)
    case <-time.After(3 * time.Second):
        fmt.Println("Timeout")
    }
}

在上述代码中,select 语句监听 ch1ch2 通道的接收操作,同时设置了一个 3 秒的超时。由于 ch2 先接收到数据,所以会输出 Received from ch2: 20。如果 ch1ch2 在 3 秒内都没有接收到数据,则会输出 Timeout

协程的异常处理

Panic 和 Recover

在 Go 语言中,panic 用于表示程序发生了不可恢复的错误,它会导致当前协程立即停止执行,并开始展开栈(unwind),调用所有已注册的 defer 语句。recover 用于在 defer 语句中捕获 panic,并恢复程序的正常执行。

package main

import (
    "fmt"
)

func divide(a, b int) int {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered from panic:", r)
        }
    }()
    if b == 0 {
        panic("division by zero")
    }
    return a / b
}

func main() {
    result := divide(10, 0)
    fmt.Println("Result:", result)
}

在上述代码中,divide 函数中如果 b 为 0 会触发 panic。通过 deferrecover,可以捕获 panic 并进行处理,使得程序不会崩溃,继续执行 main 函数中的后续代码。

处理多个协程中的 Panic

当多个协程中可能发生 panic 时,需要特别注意异常处理。可以通过通道来传递 panic 信息,以便在主协程中统一处理。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup, panicCh chan interface{}) {
    defer func() {
        if r := recover(); r != nil {
            panicCh <- r
        }
        wg.Done()
    }()
    if id == 2 {
        panic("Worker 2 panicked")
    }
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    panicCh := make(chan interface{})
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, &wg, panicCh)
    }
    go func() {
        wg.Wait()
        close(panicCh)
    }()
    for r := range panicCh {
        fmt.Println("Received panic:", r)
    }
}

在上述代码中,每个 worker 协程通过 recover 捕获 panic 并将其发送到 panicCh 通道。main 协程通过 for... range 从通道接收 panic 信息并进行处理。

协程性能优化

减少锁的竞争

  1. 优化锁的粒度
    • 尽量减小锁保护的代码块范围。例如,在对共享数据进行复杂计算时,可以先在无锁的情况下进行部分计算,然后只在更新共享数据时加锁。
    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var (
        mu      sync.Mutex
        counter int
    )
    
    func increment() {
        local := 0
        // 无锁计算
        for i := 0; i < 1000; i++ {
            local++
        }
        mu.Lock()
        counter += local
        mu.Unlock()
    }
    
    func main() {
        var wg sync.WaitGroup
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                increment()
            }()
        }
        wg.Wait()
        fmt.Printf("Final counter value: %d\n", counter)
    }
    
    在上述代码中,increment 函数先在无锁状态下进行局部计算,然后加锁更新共享变量 counter,减少了锁的持有时间。
  2. 使用读写锁替代互斥锁
    • 如前文所述,当读操作远多于写操作时,使用读写锁可以提高并发性能,减少锁的竞争。

合理设置 P 的数量

P 的数量决定了同一时刻可以并行执行的协程数量。默认情况下,P 的数量等于 CPU 的核心数。可以通过 runtime.GOMAXPROCS 函数来调整 P 的数量。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000000; i++ {
        _ = i * i
    }
}

func main() {
    numProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(numProcs * 2)
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    fmt.Println("All workers completed")
}

在上述代码中,通过 runtime.GOMAXPROCSP 的数量设置为 CPU 核心数的两倍,观察不同 P 数量下程序的性能表现。但需要注意,并非 P 的数量越多性能就越好,过多的 P 可能导致调度开销增大,需要根据实际应用场景进行调优。

避免不必要的协程创建

虽然协程的创建开销很小,但如果创建过多不必要的协程,也会增加系统资源的消耗。例如,在一些简单的循环操作中,如果不需要并发执行,就不应该为每个循环创建一个协程。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    // 不推荐做法:为每个循环创建协程
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(j int) {
            defer wg.Done()
            fmt.Printf("Number: %d\n", j)
        }(i)
    }
    wg.Wait()

    // 推荐做法:直接在主协程中循环
    for i := 0; i < 10; i++ {
        fmt.Printf("Number: %d\n", i)
    }
}

在上述代码中,第一种做法为每个循环创建协程,虽然代码看起来“并发”,但实际上对于这种简单的顺序操作,创建协程带来的调度开销可能大于并发执行的收益。第二种做法直接在主协程中循环,效率更高。

通过深入理解 Go 协程的工作原理,包括调度器、并发控制、与通道的结合使用、异常处理以及性能优化等方面,开发者可以编写出高效、稳定的并发程序,充分发挥 Go 语言在并发编程方面的优势。