Go语言WaitGroup的实用场景与原理
Go语言并发编程基础
在深入探讨 WaitGroup
之前,我们先来回顾一下Go语言并发编程的基础概念。Go语言的并发编程模型基于 goroutine
和 channel
。
goroutine
goroutine
是Go语言中轻量级的线程执行单元。与传统线程相比,goroutine
的创建和销毁成本极低。在Go语言中,通过 go
关键字就可以轻松启动一个 goroutine
。例如:
package main
import (
"fmt"
"time"
)
func printNumbers() {
for i := 1; i <= 5; i++ {
fmt.Println("Number:", i)
time.Sleep(time.Millisecond * 500)
}
}
func printLetters() {
for i := 'a'; i <= 'e'; i++ {
fmt.Println("Letter:", string(i))
time.Sleep(time.Millisecond * 500)
}
}
func main() {
go printNumbers()
go printLetters()
time.Sleep(time.Second * 3)
}
在上述代码中,我们通过 go
关键字启动了两个 goroutine
,分别执行 printNumbers
和 printLetters
函数。main
函数本身也是一个 goroutine
。这里需要注意的是,main
函数中的 time.Sleep
是为了防止 main
函数过早退出,导致其他 goroutine
还未执行完毕就被终止。
channel
channel
是Go语言中用于在 goroutine
之间进行通信的机制,它可以看作是一个类型化的管道。通过 channel
,不同的 goroutine
可以安全地传递数据。例如:
package main
import (
"fmt"
)
func sendData(ch chan int) {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
}
func receiveData(ch chan int) {
for num := range ch {
fmt.Println("Received:", num)
}
}
func main() {
ch := make(chan int)
go sendData(ch)
go receiveData(ch)
select {}
}
在这段代码中,我们创建了一个整型的 channel
。sendData
函数向 channel
中发送数据,receiveData
函数从 channel
中接收数据。for... range
结构在 channel
关闭时会自动退出循环。最后的 select {}
语句是为了防止 main
函数退出,保证 goroutine
有足够的时间执行。
WaitGroup 介绍
在Go语言的并发编程中,我们经常会遇到需要等待一组 goroutine
全部完成后再进行下一步操作的场景。例如,在一个Web爬虫程序中,可能会启动多个 goroutine
同时抓取不同网页的数据,只有当所有网页都抓取完毕后,才能对数据进行统一的分析和处理。这时,WaitGroup
就派上了用场。
WaitGroup
是Go标准库 sync
包中的一个类型,它提供了一种简单的机制来等待一组 goroutine
完成。WaitGroup
内部维护了一个计数器,通过调用 Add
方法来增加计数器的值,通过调用 Done
方法来减少计数器的值,通过调用 Wait
方法来阻塞当前 goroutine
,直到计数器的值变为0。
WaitGroup 的使用方法
初始化 WaitGroup
在使用 WaitGroup
之前,需要先进行初始化。通常是在 main
函数或者其他需要等待 goroutine
完成的地方定义一个 WaitGroup
变量。例如:
var wg sync.WaitGroup
也可以使用 sync.WaitGroup
类型的指针,通过 new
关键字或者 &
取地址符来初始化:
wg := new(sync.WaitGroup)
// 或者
wg := &sync.WaitGroup{}
Add 方法
Add
方法用于增加 WaitGroup
内部计数器的值。一般在启动 goroutine
之前调用 Add
方法,参数为要启动的 goroutine
的数量。例如:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers have finished")
}
在上述代码中,我们在启动每个 worker
goroutine
之前调用 wg.Add(1)
,表示有一个 goroutine
要执行。
Done 方法
Done
方法用于减少 WaitGroup
内部计数器的值,通常在 goroutine
完成任务后调用。一般会使用 defer
关键字来确保 Done
方法一定会被调用,即使 goroutine
执行过程中发生错误。例如在上面的 worker
函数中:
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished\n", id)
}
defer wg.Done()
会在 worker
函数返回时调用,将 WaitGroup
的计数器减1。
Wait 方法
Wait
方法用于阻塞当前 goroutine
,直到 WaitGroup
内部计数器的值变为0。在需要等待所有 goroutine
完成的地方调用 Wait
方法。例如在上面的 main
函数中:
func main() {
var wg sync.WaitGroup
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers have finished")
}
wg.Wait()
会阻塞 main
goroutine
,直到所有 worker
goroutine
都调用了 wg.Done()
,计数器变为0,然后 main
goroutine
继续执行后续的代码。
WaitGroup 的实用场景
并行任务处理
在许多实际应用中,我们需要并行处理多个任务,然后等待所有任务完成后再进行统一的结果汇总或后续操作。例如,在一个数据分析程序中,可能需要从多个数据源获取数据,每个数据源的获取操作可以并行进行,当所有数据都获取完毕后,再进行数据的合并和分析。
package main
import (
"fmt"
"sync"
"time"
)
func fetchData(source int, result *[]int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Fetching data from source %d\n", source)
time.Sleep(time.Second)
*result = append(*result, source*10)
}
func main() {
var wg sync.WaitGroup
data := make([]int, 0)
numSources := 3
for i := 1; i <= numSources; i++ {
wg.Add(1)
go fetchData(i, &data, &wg)
}
wg.Wait()
fmt.Println("All data fetched:", data)
}
在这个例子中,我们模拟从多个数据源获取数据。每个 fetchData
goroutine
从一个数据源获取数据(这里简单地返回数据源编号乘以10),并将结果添加到 data
切片中。main
函数通过 WaitGroup
等待所有数据获取完成后,输出最终的数据。
批量文件处理
假设我们需要对一批文件进行处理,例如压缩、加密等操作。为了提高效率,可以并行处理这些文件,然后等待所有文件处理完成后再进行下一步操作,比如记录处理日志。
package main
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
)
func processFile(filePath string, wg *sync.WaitGroup) {
defer wg.Done()
data, err := ioutil.ReadFile(filePath)
if err != nil {
fmt.Printf("Error reading file %s: %v\n", filePath, err)
return
}
// 这里可以进行文件处理操作,例如加密、压缩等
newFilePath := filePath + ".processed"
err = ioutil.WriteFile(newFilePath, data, 0644)
if err != nil {
fmt.Printf("Error writing processed file %s: %v\n", newFilePath, err)
}
fmt.Printf("Processed file %s\n", filePath)
}
func main() {
var wg sync.WaitGroup
files, err := filepath.Glob("*.txt")
if err != nil {
fmt.Printf("Error getting files: %v\n", err)
return
}
for _, file := range files {
wg.Add(1)
go processFile(file, &wg)
}
wg.Wait()
fmt.Println("All files processed")
}
在这个代码示例中,我们使用 filepath.Glob
获取所有符合条件的文件(这里假设是所有 .txt
文件),然后为每个文件启动一个 goroutine
进行处理。processFile
函数读取文件内容,然后可以在其中添加实际的文件处理逻辑(这里只是简单地将处理后的内容写入一个新文件)。main
函数通过 WaitGroup
等待所有文件处理完成后输出提示信息。
并发数据库操作
在一个数据库应用中,可能需要同时执行多个数据库查询操作,然后等待所有查询结果返回后进行汇总分析。
package main
import (
"database/sql"
"fmt"
"sync"
_ "github.com/lib/pq" // 这里假设使用PostgreSQL,实际根据需求更换
)
func queryDB(db *sql.DB, query string, result *[]string, wg *sync.WaitGroup) {
defer wg.Done()
rows, err := db.Query(query)
if err != nil {
fmt.Printf("Error querying database: %v\n", err)
return
}
defer rows.Close()
var data string
for rows.Next() {
err := rows.Scan(&data)
if err != nil {
fmt.Printf("Error scanning row: %v\n", err)
continue
}
*result = append(*result, data)
}
if err := rows.Err(); err != nil {
fmt.Printf("Error in rows: %v\n", err)
}
}
func main() {
db, err := sql.Open("postgres", "user=postgres dbname=mydb sslmode=disable")
if err != nil {
fmt.Printf("Error opening database: %v\n", err)
return
}
defer db.Close()
var wg sync.WaitGroup
results := make([]string, 0)
queries := []string{
"SELECT column1 FROM table1",
"SELECT column2 FROM table2",
}
for _, query := range queries {
wg.Add(1)
go queryDB(db, query, &results, &wg)
}
wg.Wait()
fmt.Println("All queries completed. Results:", results)
}
在这个示例中,我们假设使用PostgreSQL数据库(实际应用中可根据具体数据库进行调整)。queryDB
函数执行一个数据库查询,并将结果添加到 results
切片中。main
函数为每个查询启动一个 goroutine
,并通过 WaitGroup
等待所有查询完成后输出结果。
WaitGroup 的原理
内部结构
WaitGroup
的实现位于Go标准库的 src/sync/waitgroup.go
文件中。其内部结构定义如下:
// WaitGroup is a synchronization primitive used to wait for a collection of goroutines to finish.
// The main use of a WaitGroup is to wait for all the goroutines launched by a piece of code to complete.
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 386 and arm
// do not enforce it. The compiler will allocate 8 bytes anyway, so we
// can just use the alignment.
state1 [3]uint32
}
WaitGroup
内部使用一个 state1
数组来存储状态信息,其中高32位存储计数器的值,低32位存储等待的 goroutine
的数量。noCopy
字段用于防止 WaitGroup
被意外复制,因为复制 WaitGroup
可能会导致未定义行为。
Add 方法的实现
Add
方法的实现如下:
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if delta > 0 && v == int32(delta) {
// The first increment after the counter was zero must not wake
// any goroutines. Avoids a race with Wait.
return
}
if w != 0 && delta < 0 {
// Decrementing waiters count shouldn't happen concurrently with
// Wait. This is useful for debugging, so we can detect
// incorrect usage of WaitGroup and report a clearer error.
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if v > 0 || w == 0 {
return
}
// Counter is zero, and waiters are pending.
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Add
方法通过 atomic.AddUint64
原子操作来增加计数器的值。如果计数器变为0,并且有等待的 goroutine
,则通过 runtime_Semrelease
释放所有等待的 goroutine
。如果计数器变为负数,会触发 panic
。
Done 方法的实现
Done
方法实际上是 Add(-1)
的快捷方式,其实现如下:
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
这样,在 goroutine
完成任务后,通过调用 Done
方法就可以方便地减少计数器的值。
Wait 方法的实现
Wait
方法的实现如下:
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
// Counter is 0, no need to wait.
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Wait
方法通过 atomic.LoadUint64
原子操作加载当前的状态值。如果计数器为0,则直接返回。否则,通过 atomic.CompareAndSwapUint64
尝试增加等待的 goroutine
数量,然后通过 runtime_Semacquire
阻塞当前 goroutine
,直到 WaitGroup
的计数器变为0,被 runtime_Semrelease
唤醒。
使用 WaitGroup 时的注意事项
避免重复使用未完成的 WaitGroup
在 Wait
方法返回之前,不应该再次调用 Add
方法来增加计数器的值,否则会导致 panic
。例如:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Goroutine started")
}()
wg.Wait()
// 这里再次调用 Add 是安全的,因为 Wait 已经返回
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Another goroutine started")
}()
wg.Wait()
}
如果在 Wait
方法返回之前调用 Add
,如下所示:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Goroutine started")
}()
// 错误:在 Wait 之前再次调用 Add
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Another goroutine started")
}()
wg.Wait()
}
这样会触发 panic
,因为在第一个 goroutine
还未完成时就尝试修改 WaitGroup
的状态。
确保所有 goroutine 都调用 Done
如果有 goroutine
忘记调用 Done
方法,Wait
方法将永远阻塞。例如:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
// 错误:忘记调用 wg.Done()
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers have finished")
}
在这个例子中,worker
函数忘记调用 wg.Done()
,导致 main
函数中的 wg.Wait()
永远阻塞,程序无法正常结束。
正确处理错误情况
在 goroutine
执行过程中,如果发生错误,应该在适当的地方调用 Done
方法,以确保 WaitGroup
的计数器能够正确递减。例如:
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
if id == 2 {
fmt.Printf("Worker %d encountered an error\n", id)
return
}
fmt.Printf("Worker %d finished successfully\n", id)
}
func main() {
var wg sync.WaitGroup
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers have finished")
}
在这个例子中,worker
函数在遇到错误时依然通过 defer wg.Done()
来减少计数器的值,保证 main
函数中的 wg.Wait()
能够正确等待所有 goroutine
完成。
通过深入理解 WaitGroup
的实用场景、原理以及注意事项,我们能够在Go语言的并发编程中更加高效地使用它,编写出健壮、可靠的并发程序。无论是并行任务处理、批量文件处理还是并发数据库操作等场景,WaitGroup
都为我们提供了一种简洁而强大的同步机制。同时,在使用过程中遵循正确的使用方法和注意事项,能够避免常见的错误,确保程序的稳定性和正确性。