Go WaitGroup在复杂并发场景的应用
Go WaitGroup 基础概念
在 Go 语言的并发编程中,WaitGroup
是一个非常重要的同步工具。它的作用类似于一个计数器,用于等待一组 goroutine 完成它们的工作。WaitGroup
类型定义在 sync
包中,其结构体定义如下:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
这个结构体的具体实现细节对于使用 WaitGroup
来说并不重要,但了解它的存在可以让我们知道 WaitGroup
内部是有状态的。
WaitGroup
主要有三个方法:Add
、Done
和 Wait
。
Add
方法:用于向WaitGroup
计数器添加指定的增量。例如,如果我们要启动 3 个 goroutine 并等待它们完成,我们可以调用wg.Add(3)
,这里wg
是一个WaitGroup
实例。Add
方法可以在任何时候调用,但通常在启动 goroutine 之前调用。如果在Wait
方法已经开始等待后调用Add
,可能会导致程序死锁,因为等待的 goroutine 已经在等待计数器归零,而新增加的计数不会被等待的 goroutine 知晓。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("goroutine is running")
}()
wg.Wait()
fmt.Println("all goroutines are done")
}
在这个例子中,我们创建了一个 WaitGroup
实例 wg
,通过 wg.Add(1)
增加计数,然后启动一个 goroutine。在 goroutine 中,我们使用 defer wg.Done()
来表示该 goroutine 工作完成,最后在主 goroutine 中调用 wg.Wait()
等待所有 goroutine 完成。
Done
方法:其实它是Add(-1)
的便捷写法,用于将WaitGroup
计数器减 1。通常在 goroutine 的末尾调用Done
方法,表示该 goroutine 已经完成了它的工作。它一般和defer
关键字一起使用,这样即使 goroutine 发生 panic,计数器也能正确减 1。Wait
方法:该方法会阻塞调用它的 goroutine,直到WaitGroup
计数器归零。也就是说,只有当所有通过Add
方法增加的计数都通过Done
方法减少到 0 时,Wait
才会返回,继续执行后续代码。
简单并发场景下的 WaitGroup
在简单的并发场景中,WaitGroup
的使用非常直观。例如,我们有多个独立的任务需要并发执行,然后等待所有任务完成后再进行下一步操作。
package main
import (
"fmt"
"sync"
"time"
)
func task(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
numTasks := 3
for i := 1; i <= numTasks; i++ {
wg.Add(1)
go task(i, &wg)
}
wg.Wait()
fmt.Println("All tasks are completed")
}
在这个示例中,我们定义了一个 task
函数,每个 task
模拟了一个需要花费 1 秒时间完成的工作。在 main
函数中,我们通过循环启动 3 个 goroutine 来执行 task
函数,并使用 WaitGroup
来等待所有任务完成。这种场景下,WaitGroup
简单直接地帮助我们实现了并发任务的同步。
复杂并发场景下 WaitGroup 的应用
- 层级式并发任务 在一些复杂的业务场景中,任务可能呈现层级结构。例如,我们有一个主任务,它启动多个子任务,而每个子任务又可能启动更多的孙任务。
package main
import (
"fmt"
"sync"
"time"
)
func subTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Sub - task %d started\n", id)
var subWg sync.WaitGroup
subWg.Add(2)
go func() {
defer subWg.Done()
fmt.Printf("Grand - sub - task 1 of sub - task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Grand - sub - task 1 of sub - task %d finished\n", id)
}()
go func() {
defer subWg.Done()
fmt.Printf("Grand - sub - task 2 of sub - task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Grand - sub - task 2 of sub - task %d finished\n", id)
}()
subWg.Wait()
fmt.Printf("Sub - task %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
numSubTasks := 3
for i := 1; i <= numSubTasks; i++ {
wg.Add(1)
go subTask(i, &wg)
}
wg.Wait()
fmt.Println("All tasks are completed")
}
在这个例子中,subTask
函数模拟了子任务,每个子任务又启动了两个孙任务。我们通过在子任务内部创建新的 WaitGroup
(subWg
)来等待孙任务完成,同时在主任务中使用 wg
来等待所有子任务完成。这样,通过层级式地使用 WaitGroup
,我们可以很好地管理复杂的层级并发任务。
- 并发任务依赖关系处理
有时候,并发任务之间存在依赖关系。例如,任务 B 需要在任务 A 完成后才能开始。我们可以使用
WaitGroup
结合通道(channel)来实现这种依赖关系。
package main
import (
"fmt"
"sync"
"time"
)
func taskA(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Task A started")
time.Sleep(time.Second)
fmt.Println("Task A finished")
}
func taskB(wg *sync.WaitGroup, signal chan struct{}) {
defer wg.Done()
<-signal
fmt.Println("Task B started")
time.Sleep(time.Second)
fmt.Println("Task B finished")
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
signal := make(chan struct{})
go taskA(&wg)
go taskB(&wg, signal)
go func() {
wg.Wait()
close(signal)
}()
}
在这个示例中,taskB
依赖于 taskA
的完成。我们通过一个无缓冲通道 signal
来传递任务 A 完成的信号。在 main
函数中,我们启动 taskA
和 taskB
,同时使用一个匿名 goroutine 来等待 taskA
和 taskB
完成(通过 wg.Wait()
),然后关闭通道 signal
,这样 taskB
就可以接收到信号并开始执行。
- 动态创建和等待 goroutine 在某些场景下,我们可能需要根据运行时的条件动态地创建和等待 goroutine。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
duration := time.Duration(rand.Intn(3)) * time.Second
fmt.Printf("Worker %d started, will run for %v\n", id, duration)
time.Sleep(duration)
fmt.Printf("Worker %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := rand.Intn(5) + 1
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers are done")
}
在这个例子中,我们使用 rand.Intn(5) + 1
动态生成需要启动的 goroutine 数量。每个 worker
函数模拟一个随机执行时间的任务。通过 WaitGroup
,我们可以方便地等待所有动态创建的 goroutine 完成。
- 错误处理与 WaitGroup
在实际应用中,并发任务可能会出错。我们可以结合
WaitGroup
和错误处理机制来确保程序的健壮性。
package main
import (
"errors"
"fmt"
"sync"
"time"
)
var errTaskFailed = errors.New("task failed")
func task(id int, wg *sync.WaitGroup, errChan chan error) {
defer wg.Done()
if id == 2 {
errChan <- errTaskFailed
return
}
fmt.Printf("Task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
numTasks := 3
errChan := make(chan error, numTasks)
for i := 1; i <= numTasks; i++ {
wg.Add(1)
go task(i, &wg, errChan)
}
go func() {
wg.Wait()
close(errChan)
}()
for err := range errChan {
if err != nil {
fmt.Printf("Error: %v\n", err)
}
}
fmt.Println("All tasks processed")
}
在这个示例中,task
函数可能会返回错误。我们通过一个 errChan
通道来传递错误信息。在 main
函数中,我们启动多个任务,并使用 WaitGroup
等待所有任务完成。当所有任务完成后,我们关闭 errChan
通道,并通过 for... range
循环读取通道中的错误信息进行处理。
WaitGroup 使用中的注意事项
- 避免重复等待
不要在同一个
WaitGroup
上多次调用Wait
方法,除非你明确知道自己在做什么。例如,在一个函数中调用wg.Wait()
后,又在另一个地方再次调用wg.Wait()
,这可能会导致意外的阻塞行为,因为第一次Wait
已经阻塞到计数器归零,第二次调用可能会永远阻塞。 - 正确使用 Add 和 Done
确保
Add
和Done
的调用次数匹配。如果Add
的次数多于Done
,Wait
方法将永远阻塞;反之,如果Done
的次数多于Add
,可能会导致运行时恐慌(panic)。 - 防止死锁
在并发编程中,死锁是一个常见的问题。当在
Wait
方法已经开始等待后调用Add
,或者Add
的次数与Done
不匹配导致Wait
永远阻塞时,都可能发生死锁。仔细设计并发逻辑,确保WaitGroup
的使用正确无误可以避免死锁。
与其他同步工具的结合使用
- WaitGroup 与 Mutex
在一些场景下,我们可能需要保护共享资源,同时等待一组 goroutine 完成。这时可以结合
WaitGroup
和Mutex
。
package main
import (
"fmt"
"sync"
)
type Data struct {
value int
mu sync.Mutex
}
func worker(id int, data *Data, wg *sync.WaitGroup) {
defer wg.Done()
data.mu.Lock()
data.value += id
data.mu.Unlock()
fmt.Printf("Worker %d updated data value to %d\n", id, data.value)
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
data := Data{}
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &data, &wg)
}
wg.Wait()
fmt.Printf("Final data value: %d\n", data.value)
}
在这个例子中,多个 worker
goroutine 需要修改共享的 Data
结构体中的 value
字段。我们使用 Mutex
来保护对 value
的并发访问,同时使用 WaitGroup
等待所有 worker
完成任务。
- WaitGroup 与 Channel
我们前面已经看到了一些结合
WaitGroup
和通道的示例。通道可以用于在 goroutine 之间传递数据和信号,而WaitGroup
用于等待一组 goroutine 完成。它们的结合可以实现非常复杂的并发控制逻辑。例如,我们可以使用通道来分发任务,然后使用WaitGroup
等待所有任务处理完成。
package main
import (
"fmt"
"sync"
)
func worker(taskChan <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range taskChan {
fmt.Printf("Processing task %d\n", task)
}
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
taskChan := make(chan int)
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(taskChan, &wg)
}
for i := 1; i <= 5; i++ {
taskChan <- i
}
close(taskChan)
wg.Wait()
fmt.Println("All tasks are processed")
}
在这个示例中,我们通过 taskChan
通道向多个 worker
goroutine 分发任务。每个 worker
从通道中读取任务并处理,直到通道关闭。WaitGroup
用于等待所有 worker
完成任务。
通过以上详细的介绍和丰富的代码示例,我们深入了解了 Go 语言中 WaitGroup
在复杂并发场景下的应用。掌握 WaitGroup
的正确使用方法,结合其他同步工具,可以帮助我们编写高效、健壮的并发程序。无论是层级式任务、任务依赖关系处理,还是动态创建 goroutine 等场景,WaitGroup
都能发挥重要作用,同时在使用过程中注意避免常见的问题,如死锁、重复等待等,确保程序的正确性和稳定性。