go 中 WaitGroup 的应用与实例
Go 语言并发编程基础
在深入探讨 WaitGroup
之前,我们先来回顾一下 Go 语言并发编程的一些基础知识。Go 语言在设计之初就将并发编程作为其核心特性之一,通过 goroutine
和 channel
提供了简洁而高效的并发编程模型。
goroutine
goroutine
是 Go 语言中实现并发的轻量级线程。与操作系统线程相比,goroutine
的创建和销毁成本极低,使得我们可以轻松创建数以万计的并发任务。启动一个 goroutine
非常简单,只需要在函数调用前加上 go
关键字即可。例如:
package main
import (
"fmt"
"time"
)
func worker(id int) {
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished\n", id)
}
func main() {
for i := 1; i <= 3; i++ {
go worker(i)
}
time.Sleep(2 * time.Second)
fmt.Println("Main function exiting")
}
在上述代码中,main
函数通过循环启动了 3 个 goroutine
来执行 worker
函数。每个 worker
函数模拟了一个耗时任务,通过 time.Sleep
暂停 1 秒。main
函数在启动完所有 goroutine
后,通过 time.Sleep(2 * time.Second)
等待一段时间,以确保所有 goroutine
有足够时间执行完毕,最后输出 "Main function exiting"。
channel
channel
是 Go 语言中用于在 goroutine
之间进行通信和同步的机制。它可以被看作是一个类型安全的管道,数据可以从一端发送,从另一端接收。创建一个 channel
的语法如下:
ch := make(chan int)
这里创建了一个可以传输 int
类型数据的 channel
。向 channel
发送数据使用 <-
操作符:
ch <- 10
从 channel
接收数据也使用 <-
操作符:
data := <-ch
下面是一个简单的 channel
示例,展示了两个 goroutine
之间通过 channel
进行数据传递:
package main
import (
"fmt"
)
func sender(ch chan int) {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
}
func receiver(ch chan int) {
for data := range ch {
fmt.Printf("Received: %d\n", data)
}
}
func main() {
ch := make(chan int)
go sender(ch)
go receiver(ch)
select {}
}
在这个示例中,sender
goroutine
向 channel
中发送 1 到 5 的整数,然后通过 close(ch)
关闭 channel
。receiver
goroutine
使用 for... range
循环从 channel
中接收数据,直到 channel
关闭。main
函数中的 select {}
语句是为了防止 main
函数提前退出,因为 main
函数是整个程序的初始 goroutine
,一旦 main
函数返回,程序就会结束。
WaitGroup 概述
WaitGroup
是 Go 语言标准库 sync
包中的一个类型,用于协调多个 goroutine
的同步。它提供了一种简单的方式来等待一组 goroutine
全部完成任务。WaitGroup
内部维护了一个计数器,通过 Add
方法增加计数器的值,通过 Done
方法减少计数器的值,通过 Wait
方法阻塞当前 goroutine
,直到计数器的值变为 0。
WaitGroup 的结构
WaitGroup
的结构在 sync
包的源码中定义如下:
// src/sync/waitgroup.go
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
state1
字段是一个包含 3 个 uint32
类型的数组,用于存储计数器的值和等待队列的信息。虽然我们通常不需要直接操作这个结构,但了解其大致构成有助于理解 WaitGroup
的工作原理。
WaitGroup 的方法
- Add(delta int):将计数器增加
delta
。如果delta
为负数,会导致panic
。通常在启动goroutine
之前调用Add
方法,传入需要等待的goroutine
的数量。 - Done():将计数器减 1,等同于
Add(-1)
。一般在goroutine
完成任务后调用。 - Wait():阻塞当前
goroutine
,直到计数器的值变为 0。
WaitGroup 的基本应用
简单示例:等待多个 goroutine 完成
下面我们通过一个简单的示例来展示如何使用 WaitGroup
等待多个 goroutine
完成任务。假设我们有一组任务,每个任务模拟一个耗时操作,我们希望在所有任务完成后再继续执行主程序。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
wg.Add(numWorkers)
for i := 1; i <= numWorkers; i++ {
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers have finished")
}
在这个示例中:
- 我们在
main
函数中创建了一个WaitGroup
实例wg
。 - 使用
wg.Add(numWorkers)
将计数器设置为需要等待的goroutine
的数量,这里是 3。 - 通过循环启动 3 个
goroutine
,每个goroutine
执行worker
函数,并将wg
的指针传递给worker
函数。 - 在
worker
函数中,通过defer wg.Done()
确保在函数结束时将计数器减 1。 main
函数调用wg.Wait()
阻塞,直到所有goroutine
完成任务,计数器变为 0,然后输出 "All workers have finished"。
避免在 goroutine 中调用 Add
在使用 WaitGroup
时,有一个重要的注意事项:不要在 goroutine
内部调用 Add
方法。这是因为 Add
方法可能会导致 WaitGroup
的状态发生变化,而在 goroutine
中调用可能会引发竞态条件。例如:
package main
import (
"fmt"
"sync"
"time"
)
func badWorker(wg *sync.WaitGroup) {
wg.Add(1)
fmt.Println("Bad worker started")
time.Sleep(time.Second)
fmt.Println("Bad worker finished")
wg.Done()
}
func main() {
var wg sync.WaitGroup
go badWorker(&wg)
wg.Wait()
fmt.Println("Main function exiting")
}
在上述代码中,badWorker
goroutine
内部调用了 wg.Add(1)
,这可能会导致在 main
函数调用 wg.Wait()
时,计数器还未正确设置,从而引发未定义行为。正确的做法是在启动 goroutine
之前就设置好计数器的值。
WaitGroup 在复杂场景中的应用
分组等待多个 goroutine 组
在实际应用中,我们可能需要管理多个不同的 goroutine
组,每个组有不同的任务,并且我们希望能够分别等待每个组的任务完成。WaitGroup
可以很方便地实现这一点。下面是一个示例,展示了如何等待两个不同组的 goroutine
完成:
package main
import (
"fmt"
"sync"
"time"
)
func group1Worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Group1 Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Group1 Worker %d finished\n", id)
}
func group2Worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Group2 Worker %d started\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Group2 Worker %d finished\n", id)
}
func main() {
var wg1, wg2 sync.WaitGroup
numGroup1Workers := 2
numGroup2Workers := 3
wg1.Add(numGroup1Workers)
wg2.Add(numGroup2Workers)
for i := 1; i <= numGroup1Workers; i++ {
go group1Worker(i, &wg1)
}
for i := 1; i <= numGroup2Workers; i++ {
go group2Worker(i, &wg2)
}
fmt.Println("Waiting for Group1 to finish")
wg1.Wait()
fmt.Println("Group1 has finished")
fmt.Println("Waiting for Group2 to finish")
wg2.Wait()
fmt.Println("Group2 has finished")
fmt.Println("All groups have finished")
}
在这个示例中:
- 我们创建了两个
WaitGroup
实例wg1
和wg2
,分别用于管理两个不同组的goroutine
。 - 为每个组的
goroutine
数量设置相应的计数器值。 - 分别启动两个组的
goroutine
,每个goroutine
在完成任务时调用对应的wg.Done()
。 - 通过分别调用
wg1.Wait()
和wg2.Wait()
等待两个组的任务依次完成,并输出相应的提示信息。
实现任务流水线
WaitGroup
还可以用于实现任务流水线,即一个任务的输出作为下一个任务的输入,并且需要等待每个阶段的任务全部完成。下面是一个简单的任务流水线示例,模拟数据处理的三个阶段:读取数据、处理数据和输出数据。
package main
import (
"fmt"
"sync"
)
func readData(data chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 5; i++ {
data <- i
}
close(data)
}
func processData(input <-chan int, output chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range input {
output <- num * num
}
close(output)
}
func writeData(input <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for result := range input {
fmt.Printf("Processed result: %d\n", result)
}
}
func main() {
var wg sync.WaitGroup
dataCh := make(chan int)
processedCh := make(chan int)
wg.Add(3)
go readData(dataCh, &wg)
go processData(dataCh, processedCh, &wg)
go writeData(processedCh, &wg)
wg.Wait()
fmt.Println("All tasks in the pipeline have finished")
}
在这个示例中:
readData
函数将数据写入dataCh
通道,完成后关闭通道,并调用wg.Done()
。processData
函数从dataCh
通道读取数据,进行平方处理后写入processedCh
通道,完成后关闭通道,并调用wg.Done()
。writeData
函数从processedCh
通道读取处理后的数据并输出,完成后调用wg.Done()
。main
函数通过wg.Wait()
等待所有三个阶段的任务完成。
WaitGroup 与其他同步机制的结合使用
WaitGroup 与 Mutex
在并发编程中,我们经常需要保护共享资源,防止多个 goroutine
同时访问导致数据竞争。Mutex
(互斥锁)是 Go 语言中用于实现互斥访问的机制。WaitGroup
可以与 Mutex
结合使用,确保在多个 goroutine
访问共享资源时的安全性。下面是一个示例,展示了多个 goroutine
对共享资源的安全访问:
package main
import (
"fmt"
"sync"
)
type Counter struct {
value int
mu sync.Mutex
}
func (c *Counter) Increment(wg *sync.WaitGroup) {
defer wg.Done()
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func main() {
var wg sync.WaitGroup
counter := Counter{}
numGoroutines := 10
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go counter.Increment(&wg)
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter.value)
}
在这个示例中:
Counter
结构体包含一个value
字段用于存储计数值,以及一个sync.Mutex
类型的mu
字段用于保护value
。Increment
方法在增加value
之前通过c.mu.Lock()
锁定互斥锁,增加完成后通过c.mu.Unlock()
解锁,确保同一时间只有一个goroutine
可以访问value
。main
函数启动 10 个goroutine
调用Increment
方法,并通过WaitGroup
等待所有goroutine
完成,最后输出最终的计数值。
WaitGroup 与 Channel
WaitGroup
和 channel
都是 Go 语言中重要的并发编程工具,它们可以相互配合实现更复杂的同步和通信需求。例如,我们可以使用 channel
来通知 goroutine
任务的开始和结束,同时使用 WaitGroup
来等待所有相关 goroutine
完成。下面是一个示例,展示了如何通过 channel
和 WaitGroup
实现任务的有序启动和结束:
package main
import (
"fmt"
"sync"
"time"
)
func task(id int, startCh <-chan struct{}, doneCh chan<- struct{}, wg *sync.WaitGroup) {
defer wg.Done()
<-startCh
fmt.Printf("Task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d finished\n", id)
doneCh <- struct{}{}
}
func main() {
var wg sync.WaitGroup
numTasks := 3
startCh := make(chan struct{})
doneCh := make(chan struct{})
wg.Add(numTasks)
for i := 1; i <= numTasks; i++ {
go task(i, startCh, doneCh, &wg)
}
fmt.Println("Starting all tasks")
close(startCh)
for i := 0; i < numTasks; i++ {
<-doneCh
}
close(doneCh)
wg.Wait()
fmt.Println("All tasks have finished")
}
在这个示例中:
task
函数通过startCh
等待任务开始信号,接收到信号后开始执行任务,完成后通过doneCh
发送完成信号,并调用wg.Done()
。main
函数启动 3 个goroutine
执行task
函数,并通过WaitGroup
等待它们完成。- 通过关闭
startCh
通道来通知所有goroutine
开始任务,然后通过循环从doneCh
通道接收完成信号,确保所有任务都已完成,最后关闭doneCh
通道。
WaitGroup 的性能考虑
减少不必要的等待
在使用 WaitGroup
时,要尽量减少不必要的等待时间。例如,如果有一些 goroutine
的任务执行时间较短,而另一些较长,我们可以考虑将短任务和长任务分开处理,避免短任务长时间等待长任务。下面是一个优化示例,展示了如何将任务按照执行时间长短进行分组处理:
package main
import (
"fmt"
"sync"
"time"
)
func shortTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Short Task %d started\n", id)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Short Task %d finished\n", id)
}
func longTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Long Task %d started\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Long Task %d finished\n", id)
}
func main() {
var shortWG, longWG sync.WaitGroup
numShortTasks := 3
numLongTasks := 2
shortWG.Add(numShortTasks)
longWG.Add(numLongTasks)
for i := 1; i <= numShortTasks; i++ {
go shortTask(i, &shortWG)
}
for i := 1; i <= numLongTasks; i++ {
go longTask(i, &longWG)
}
fmt.Println("Waiting for short tasks to finish")
shortWG.Wait()
fmt.Println("Short tasks have finished")
fmt.Println("Waiting for long tasks to finish")
longWG.Wait()
fmt.Println("Long tasks have finished")
fmt.Println("All tasks have finished")
}
在这个示例中,我们将任务分为短任务和长任务两组,分别使用 shortWG
和 longWG
进行等待。这样,短任务完成后可以及时继续执行后续逻辑,而不需要等待长任务。
避免过度使用 WaitGroup
虽然 WaitGroup
是一个强大的同步工具,但过度使用可能会导致代码的可读性和性能下降。例如,如果在一个复杂的并发系统中,每个小的子任务都使用 WaitGroup
进行同步,可能会导致代码结构变得混乱,并且增加了不必要的同步开销。在设计并发程序时,要根据具体需求合理选择同步机制,尽量使用更轻量级的方式实现同步,只有在必要时才使用 WaitGroup
。
总结
WaitGroup
是 Go 语言并发编程中一个非常实用的工具,它提供了一种简单而有效的方式来等待一组 goroutine
完成任务。通过合理运用 WaitGroup
,我们可以实现复杂的并发场景,如分组等待、任务流水线等。同时,结合 Mutex
、channel
等其他同步机制,WaitGroup
可以进一步增强程序的安全性和灵活性。在使用 WaitGroup
时,要注意遵循其使用规范,避免在 goroutine
内部调用 Add
方法,同时要考虑性能因素,减少不必要的等待和同步开销。希望通过本文的介绍和示例,你对 WaitGroup
的应用有了更深入的理解,能够在实际的 Go 语言项目中灵活运用它来实现高效的并发编程。