Go了解协程的工作原理
Go 协程基础概念
什么是协程
在 Go 语言中,协程(goroutine)是一种轻量级的线程执行单元。与操作系统线程不同,协程由 Go 运行时(runtime)管理,而不是操作系统内核。这使得创建和销毁协程的开销极小,能够轻松创建成千上万的协程,用于并发编程。
协程与线程的区别
- 资源开销
- 线程:操作系统线程通常需要较大的栈空间(一般为几 MB),创建和销毁线程的系统开销较大,因为涉及操作系统内核的调度和资源分配。
- 协程:Go 协程的栈空间初始时非常小(通常为 2KB 左右),并且可以根据需要动态增长和收缩。创建和销毁协程几乎没有什么开销,由 Go 运行时在用户态进行管理,无需内核参与。
- 调度方式
- 线程:操作系统采用抢占式调度,内核决定哪个线程何时运行以及运行多长时间。这种调度方式可能导致线程在执行关键操作时被突然中断,需要复杂的同步机制来保证数据一致性。
- 协程:Go 协程采用协作式调度,协程会主动让出执行权。当一个协程执行如 I/O 操作、系统调用、调用
runtime.Gosched()
函数等操作时,它会暂停执行,让其他协程有机会运行。
简单的协程示例
下面通过一个简单的代码示例来展示如何在 Go 中创建和运行协程:
package main
import (
"fmt"
"time"
)
func printNumbers() {
for i := 1; i <= 5; i++ {
fmt.Printf("Number: %d\n", i)
time.Sleep(100 * time.Millisecond)
}
}
func printLetters() {
for i := 'a'; i <= 'e'; i++ {
fmt.Printf("Letter: %c\n", i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
go printNumbers()
go printLetters()
time.Sleep(1000 * time.Millisecond)
}
在上述代码中,main
函数中通过 go
关键字启动了两个协程,分别执行 printNumbers
和 printLetters
函数。time.Sleep
函数用于模拟一些工作,防止主线程过早退出。这两个协程并发执行,交替输出数字和字母。
Go 协程调度器
调度器的组成
Go 协程调度器主要由三个部分组成:M:N 调度模型、Goroutine 队列和调度器的运行机制。
- M:N 调度模型
- Go 语言采用 M:N 调度模型,即 N 个协程映射到 M 个操作系统线程上(N >> M)。这种模型结合了 1:1 模型(每个用户线程映射到一个内核线程)和 N:1 模型(多个用户线程映射到一个内核线程)的优点。它既能利用多核 CPU 的并行性,又能通过协程的轻量级特性高效地管理大量并发任务。
- 在 Go 调度器中,M 代表操作系统线程(Machine),N 代表协程(Goroutine)。多个协程可以复用少量的操作系统线程,减少了线程创建和上下文切换的开销。
- Goroutine 队列
- 全局队列:Go 运行时维护一个全局的协程队列。当一个新的协程被创建时,它有可能被放入全局队列中。当一个 M 线程在本地队列中找不到可运行的协程时,它会尝试从全局队列中获取协程来执行。
- 本地队列:每个 M 线程都有一个本地的协程队列。当一个协程被创建时,优先被放入创建它的 M 线程的本地队列中。这样,M 线程可以快速地从本地队列中获取协程执行,减少了锁的竞争。当本地队列满了或者需要平衡负载时,协程会被转移到全局队列。
- 调度器的运行机制
- 创建协程:当通过
go
关键字创建一个新的协程时,Go 运行时会为其分配一个唯一的标识符(GID),并初始化协程的栈空间等资源。然后,协程会被放入创建它的 M 线程的本地队列或者全局队列。 - 调度协程:每个 M 线程会不断地从本地队列中取出协程并执行。当本地队列为空时,M 线程会尝试从全局队列中获取协程。如果全局队列也为空,M 线程会尝试从其他 M 线程的本地队列中偷取一部分协程(工作窃取算法)。
- 协程暂停与恢复:当一个协程执行到如 I/O 操作、系统调用等阻塞操作时,它会暂停执行,并将自己从 M 线程的执行队列中移除。然后,M 线程会从队列中取出其他协程继续执行。当阻塞操作完成后,暂停的协程会被重新放入队列,等待再次被调度执行。
- 创建协程:当通过
调度器的关键数据结构
- Goroutine 结构体
type g struct { stack stack // 协程栈 stackguard0 uintptr // 栈保护边界 stackguard1 uintptr // 栈保护边界(不同架构下使用) _panic *_panic // 当前协程的 panic 链表 _defer *_defer // 当前协程的 defer 链表 m *m // 绑定的 M 线程 sched gobuf // 保存协程的上下文 goid int64 // 协程 ID // 其他字段... }
stack
字段用于存储协程的栈空间。sched
字段是一个gobuf
类型,保存了协程的上下文信息,包括程序计数器(PC)、栈指针(SP)等,使得协程可以暂停和恢复执行。
- M 结构体
type m struct { g0 *g // 用于调度的特殊协程 curg *g // 当前正在执行的协程 p puintptr// 关联的 P // 其他字段... }
g0
是一个特殊的协程,用于执行调度相关的代码,如处理系统调用、调度其他协程等。curg
指向当前正在执行的协程。
- P 结构体
type p struct { id int32 status uint32 runqhead uint32 runqtail uint32 runq [256]guintptr // 其他字段... }
P
(Processor)代表处理器资源,它管理着一个本地的协程队列(runq
)。每个 M 线程需要绑定一个 P 才能运行协程。P
的数量决定了同一时刻可以并行执行的协程数量,默认情况下,P
的数量等于 CPU 的核心数,可以通过runtime.GOMAXPROCS
函数进行调整。
协程的并发控制
互斥锁(Mutex)
在并发编程中,多个协程可能会同时访问共享资源,这可能导致数据竞争和不一致问题。互斥锁(Mutex)是一种常用的同步机制,用于保证在同一时刻只有一个协程可以访问共享资源。
package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter)
}
在上述代码中,mu
是一个 sync.Mutex
实例。在 increment
函数中,通过 mu.Lock()
锁定互斥锁,保证只有一个协程可以执行 counter++
操作,操作完成后通过 mu.Unlock()
解锁互斥锁。
读写锁(RWMutex)
当共享资源的读操作远远多于写操作时,使用读写锁(RWMutex)可以提高并发性能。读写锁允许多个协程同时进行读操作,但在写操作时会独占资源,防止其他读写操作。
package main
import (
"fmt"
"sync"
"time"
)
var (
data int
rwMutex sync.RWMutex
)
func read(wg *sync.WaitGroup) {
defer wg.Done()
rwMutex.RLock()
fmt.Printf("Read data: %d\n", data)
rwMutex.RUnlock()
}
func write(wg *sync.WaitGroup) {
defer wg.Done()
rwMutex.Lock()
data++
fmt.Printf("Write data: %d\n", data)
rwMutex.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go read(&wg)
}
time.Sleep(100 * time.Millisecond)
for i := 0; i < 2; i++ {
wg.Add(1)
go write(&wg)
}
wg.Wait()
}
在上述代码中,rwMutex
是一个 sync.RWMutex
实例。read
函数使用 rwMutex.RLock()
进行读锁定,允许多个协程同时读。write
函数使用 rwMutex.Lock()
进行写锁定,独占资源进行写操作。
条件变量(Cond)
条件变量(Cond)用于在某些条件满足时通知等待的协程。它通常与互斥锁一起使用。
package main
import (
"fmt"
"sync"
"time"
)
var (
mu sync.Mutex
cond *sync.Cond
counter int
)
func waiter(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
for counter < 10 {
fmt.Println("Waiting...")
cond.Wait()
}
fmt.Printf("Condition met: counter = %d\n", counter)
mu.Unlock()
}
func incrementer(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 15; i++ {
mu.Lock()
counter++
fmt.Printf("Incremented counter: %d\n", counter)
if counter >= 10 {
cond.Broadcast()
}
mu.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
func main() {
var wg sync.WaitGroup
cond = sync.NewCond(&mu)
wg.Add(2)
go waiter(&wg)
go incrementer(&wg)
wg.Wait()
}
在上述代码中,cond
是一个 sync.Cond
实例,通过 sync.NewCond
基于 mu
互斥锁创建。waiter
协程在 counter
小于 10 时通过 cond.Wait()
等待,incrementer
协程在 counter
大于等于 10 时通过 cond.Broadcast()
通知所有等待的协程。
协程与通道(Channel)
通道的基本概念
通道(Channel)是 Go 语言中用于协程间通信的重要机制。它可以在不同的协程之间传递数据,并且自带同步功能,有助于避免数据竞争问题。
- 通道的创建
上述代码创建了一个类型为ch := make(chan int)
int
的无缓冲通道。也可以创建有缓冲通道:
这里创建了一个容量为 10 的有缓冲通道。ch := make(chan int, 10)
- 通道的发送与接收
发送和接收操作都是阻塞的。在无缓冲通道中,发送操作会阻塞直到有其他协程从通道接收数据,接收操作会阻塞直到有其他协程向通道发送数据。在有缓冲通道中,只有当通道满了时发送操作才会阻塞,通道空了时接收操作才会阻塞。ch <- 10 // 发送数据到通道 value := <-ch // 从通道接收数据
通道与协程的同步
通道可以用于协程之间的同步。例如,通过通道来等待所有协程完成任务。
package main
import (
"fmt"
)
func worker(id int, ch chan bool) {
fmt.Printf("Worker %d started\n", id)
// 模拟工作
for i := 0; i < 1000000; i++ {
_ = i * i
}
fmt.Printf("Worker %d finished\n", id)
ch <- true
}
func main() {
const numWorkers = 5
ch := make(chan bool, numWorkers)
for i := 1; i <= numWorkers; i++ {
go worker(i, ch)
}
for i := 0; i < numWorkers; i++ {
<-ch
}
close(ch)
fmt.Println("All workers completed")
}
在上述代码中,每个 worker
协程完成工作后向通道 ch
发送一个 true
。main
协程通过从通道接收 numWorkers
次数据来等待所有 worker
协程完成任务。
通道的多路复用(Select)
select
语句用于在多个通道操作(发送或接收)之间进行选择。当有多个通道操作可以进行时,select
会随机选择一个执行。如果没有通道操作可以进行,select
会阻塞,直到有通道操作可以进行。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
time.Sleep(2 * time.Second)
ch1 <- 10
}()
go func() {
time.Sleep(1 * time.Second)
ch2 <- 20
}()
select {
case value := <-ch1:
fmt.Printf("Received from ch1: %d\n", value)
case value := <-ch2:
fmt.Printf("Received from ch2: %d\n", value)
case <-time.After(3 * time.Second):
fmt.Println("Timeout")
}
}
在上述代码中,select
语句监听 ch1
和 ch2
通道的接收操作,同时设置了一个 3 秒的超时。由于 ch2
先接收到数据,所以会输出 Received from ch2: 20
。如果 ch1
和 ch2
在 3 秒内都没有接收到数据,则会输出 Timeout
。
协程的异常处理
Panic 和 Recover
在 Go 语言中,panic
用于表示程序发生了不可恢复的错误,它会导致当前协程立即停止执行,并开始展开栈(unwind),调用所有已注册的 defer
语句。recover
用于在 defer
语句中捕获 panic
,并恢复程序的正常执行。
package main
import (
"fmt"
)
func divide(a, b int) int {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
if b == 0 {
panic("division by zero")
}
return a / b
}
func main() {
result := divide(10, 0)
fmt.Println("Result:", result)
}
在上述代码中,divide
函数中如果 b
为 0 会触发 panic
。通过 defer
和 recover
,可以捕获 panic
并进行处理,使得程序不会崩溃,继续执行 main
函数中的后续代码。
处理多个协程中的 Panic
当多个协程中可能发生 panic
时,需要特别注意异常处理。可以通过通道来传递 panic
信息,以便在主协程中统一处理。
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup, panicCh chan interface{}) {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
wg.Done()
}()
if id == 2 {
panic("Worker 2 panicked")
}
fmt.Printf("Worker %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
panicCh := make(chan interface{})
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg, panicCh)
}
go func() {
wg.Wait()
close(panicCh)
}()
for r := range panicCh {
fmt.Println("Received panic:", r)
}
}
在上述代码中,每个 worker
协程通过 recover
捕获 panic
并将其发送到 panicCh
通道。main
协程通过 for... range
从通道接收 panic
信息并进行处理。
协程性能优化
减少锁的竞争
- 优化锁的粒度
- 尽量减小锁保护的代码块范围。例如,在对共享数据进行复杂计算时,可以先在无锁的情况下进行部分计算,然后只在更新共享数据时加锁。
在上述代码中,package main import ( "fmt" "sync" ) var ( mu sync.Mutex counter int ) func increment() { local := 0 // 无锁计算 for i := 0; i < 1000; i++ { local++ } mu.Lock() counter += local mu.Unlock() } func main() { var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() increment() }() } wg.Wait() fmt.Printf("Final counter value: %d\n", counter) }
increment
函数先在无锁状态下进行局部计算,然后加锁更新共享变量counter
,减少了锁的持有时间。 - 使用读写锁替代互斥锁
- 如前文所述,当读操作远多于写操作时,使用读写锁可以提高并发性能,减少锁的竞争。
合理设置 P 的数量
P
的数量决定了同一时刻可以并行执行的协程数量。默认情况下,P
的数量等于 CPU 的核心数。可以通过 runtime.GOMAXPROCS
函数来调整 P
的数量。
package main
import (
"fmt"
"runtime"
"sync"
)
func worker(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 1000000; i++ {
_ = i * i
}
}
func main() {
numProcs := runtime.NumCPU()
runtime.GOMAXPROCS(numProcs * 2)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
在上述代码中,通过 runtime.GOMAXPROCS
将 P
的数量设置为 CPU 核心数的两倍,观察不同 P
数量下程序的性能表现。但需要注意,并非 P
的数量越多性能就越好,过多的 P
可能导致调度开销增大,需要根据实际应用场景进行调优。
避免不必要的协程创建
虽然协程的创建开销很小,但如果创建过多不必要的协程,也会增加系统资源的消耗。例如,在一些简单的循环操作中,如果不需要并发执行,就不应该为每个循环创建一个协程。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
// 不推荐做法:为每个循环创建协程
for i := 0; i < 10; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
fmt.Printf("Number: %d\n", j)
}(i)
}
wg.Wait()
// 推荐做法:直接在主协程中循环
for i := 0; i < 10; i++ {
fmt.Printf("Number: %d\n", i)
}
}
在上述代码中,第一种做法为每个循环创建协程,虽然代码看起来“并发”,但实际上对于这种简单的顺序操作,创建协程带来的调度开销可能大于并发执行的收益。第二种做法直接在主协程中循环,效率更高。
通过深入理解 Go 协程的工作原理,包括调度器、并发控制、与通道的结合使用、异常处理以及性能优化等方面,开发者可以编写出高效、稳定的并发程序,充分发挥 Go 语言在并发编程方面的优势。