MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go语言WaitGroup的实用场景与原理

2021-05-275.4k 阅读

Go语言并发编程基础

在深入探讨 WaitGroup 之前,我们先来回顾一下Go语言并发编程的基础概念。Go语言的并发编程模型基于 goroutinechannel

goroutine

goroutine 是Go语言中轻量级的线程执行单元。与传统线程相比,goroutine 的创建和销毁成本极低。在Go语言中,通过 go 关键字就可以轻松启动一个 goroutine。例如:

package main

import (
    "fmt"
    "time"
)

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Println("Number:", i)
        time.Sleep(time.Millisecond * 500)
    }
}

func printLetters() {
    for i := 'a'; i <= 'e'; i++ {
        fmt.Println("Letter:", string(i))
        time.Sleep(time.Millisecond * 500)
    }
}

func main() {
    go printNumbers()
    go printLetters()

    time.Sleep(time.Second * 3)
}

在上述代码中,我们通过 go 关键字启动了两个 goroutine,分别执行 printNumbersprintLetters 函数。main 函数本身也是一个 goroutine。这里需要注意的是,main 函数中的 time.Sleep 是为了防止 main 函数过早退出,导致其他 goroutine 还未执行完毕就被终止。

channel

channel 是Go语言中用于在 goroutine 之间进行通信的机制,它可以看作是一个类型化的管道。通过 channel,不同的 goroutine 可以安全地传递数据。例如:

package main

import (
    "fmt"
)

func sendData(ch chan int) {
    for i := 1; i <= 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiveData(ch chan int) {
    for num := range ch {
        fmt.Println("Received:", num)
    }
}

func main() {
    ch := make(chan int)

    go sendData(ch)
    go receiveData(ch)

    select {}
}

在这段代码中,我们创建了一个整型的 channelsendData 函数向 channel 中发送数据,receiveData 函数从 channel 中接收数据。for... range 结构在 channel 关闭时会自动退出循环。最后的 select {} 语句是为了防止 main 函数退出,保证 goroutine 有足够的时间执行。

WaitGroup 介绍

在Go语言的并发编程中,我们经常会遇到需要等待一组 goroutine 全部完成后再进行下一步操作的场景。例如,在一个Web爬虫程序中,可能会启动多个 goroutine 同时抓取不同网页的数据,只有当所有网页都抓取完毕后,才能对数据进行统一的分析和处理。这时,WaitGroup 就派上了用场。

WaitGroup 是Go标准库 sync 包中的一个类型,它提供了一种简单的机制来等待一组 goroutine 完成。WaitGroup 内部维护了一个计数器,通过调用 Add 方法来增加计数器的值,通过调用 Done 方法来减少计数器的值,通过调用 Wait 方法来阻塞当前 goroutine,直到计数器的值变为0。

WaitGroup 的使用方法

初始化 WaitGroup

在使用 WaitGroup 之前,需要先进行初始化。通常是在 main 函数或者其他需要等待 goroutine 完成的地方定义一个 WaitGroup 变量。例如:

var wg sync.WaitGroup

也可以使用 sync.WaitGroup 类型的指针,通过 new 关键字或者 & 取地址符来初始化:

wg := new(sync.WaitGroup)
// 或者
wg := &sync.WaitGroup{}

Add 方法

Add 方法用于增加 WaitGroup 内部计数器的值。一般在启动 goroutine 之前调用 Add 方法,参数为要启动的 goroutine 的数量。例如:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

在上述代码中,我们在启动每个 worker goroutine 之前调用 wg.Add(1),表示有一个 goroutine 要执行。

Done 方法

Done 方法用于减少 WaitGroup 内部计数器的值,通常在 goroutine 完成任务后调用。一般会使用 defer 关键字来确保 Done 方法一定会被调用,即使 goroutine 执行过程中发生错误。例如在上面的 worker 函数中:

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

defer wg.Done() 会在 worker 函数返回时调用,将 WaitGroup 的计数器减1。

Wait 方法

Wait 方法用于阻塞当前 goroutine,直到 WaitGroup 内部计数器的值变为0。在需要等待所有 goroutine 完成的地方调用 Wait 方法。例如在上面的 main 函数中:

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

wg.Wait() 会阻塞 main goroutine,直到所有 worker goroutine 都调用了 wg.Done(),计数器变为0,然后 main goroutine 继续执行后续的代码。

WaitGroup 的实用场景

并行任务处理

在许多实际应用中,我们需要并行处理多个任务,然后等待所有任务完成后再进行统一的结果汇总或后续操作。例如,在一个数据分析程序中,可能需要从多个数据源获取数据,每个数据源的获取操作可以并行进行,当所有数据都获取完毕后,再进行数据的合并和分析。

package main

import (
    "fmt"
    "sync"
    "time"
)

func fetchData(source int, result *[]int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Fetching data from source %d\n", source)
    time.Sleep(time.Second)
    *result = append(*result, source*10)
}

func main() {
    var wg sync.WaitGroup
    data := make([]int, 0)
    numSources := 3

    for i := 1; i <= numSources; i++ {
        wg.Add(1)
        go fetchData(i, &data, &wg)
    }

    wg.Wait()
    fmt.Println("All data fetched:", data)
}

在这个例子中,我们模拟从多个数据源获取数据。每个 fetchData goroutine 从一个数据源获取数据(这里简单地返回数据源编号乘以10),并将结果添加到 data 切片中。main 函数通过 WaitGroup 等待所有数据获取完成后,输出最终的数据。

批量文件处理

假设我们需要对一批文件进行处理,例如压缩、加密等操作。为了提高效率,可以并行处理这些文件,然后等待所有文件处理完成后再进行下一步操作,比如记录处理日志。

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "sync"
)

func processFile(filePath string, wg *sync.WaitGroup) {
    defer wg.Done()
    data, err := ioutil.ReadFile(filePath)
    if err != nil {
        fmt.Printf("Error reading file %s: %v\n", filePath, err)
        return
    }
    // 这里可以进行文件处理操作,例如加密、压缩等
    newFilePath := filePath + ".processed"
    err = ioutil.WriteFile(newFilePath, data, 0644)
    if err != nil {
        fmt.Printf("Error writing processed file %s: %v\n", newFilePath, err)
    }
    fmt.Printf("Processed file %s\n", filePath)
}

func main() {
    var wg sync.WaitGroup
    files, err := filepath.Glob("*.txt")
    if err != nil {
        fmt.Printf("Error getting files: %v\n", err)
        return
    }

    for _, file := range files {
        wg.Add(1)
        go processFile(file, &wg)
    }

    wg.Wait()
    fmt.Println("All files processed")
}

在这个代码示例中,我们使用 filepath.Glob 获取所有符合条件的文件(这里假设是所有 .txt 文件),然后为每个文件启动一个 goroutine 进行处理。processFile 函数读取文件内容,然后可以在其中添加实际的文件处理逻辑(这里只是简单地将处理后的内容写入一个新文件)。main 函数通过 WaitGroup 等待所有文件处理完成后输出提示信息。

并发数据库操作

在一个数据库应用中,可能需要同时执行多个数据库查询操作,然后等待所有查询结果返回后进行汇总分析。

package main

import (
    "database/sql"
    "fmt"
    "sync"

    _ "github.com/lib/pq" // 这里假设使用PostgreSQL,实际根据需求更换
)

func queryDB(db *sql.DB, query string, result *[]string, wg *sync.WaitGroup) {
    defer wg.Done()
    rows, err := db.Query(query)
    if err != nil {
        fmt.Printf("Error querying database: %v\n", err)
        return
    }
    defer rows.Close()
    var data string
    for rows.Next() {
        err := rows.Scan(&data)
        if err != nil {
            fmt.Printf("Error scanning row: %v\n", err)
            continue
        }
        *result = append(*result, data)
    }
    if err := rows.Err(); err != nil {
        fmt.Printf("Error in rows: %v\n", err)
    }
}

func main() {
    db, err := sql.Open("postgres", "user=postgres dbname=mydb sslmode=disable")
    if err != nil {
        fmt.Printf("Error opening database: %v\n", err)
        return
    }
    defer db.Close()

    var wg sync.WaitGroup
    results := make([]string, 0)
    queries := []string{
        "SELECT column1 FROM table1",
        "SELECT column2 FROM table2",
    }

    for _, query := range queries {
        wg.Add(1)
        go queryDB(db, query, &results, &wg)
    }

    wg.Wait()
    fmt.Println("All queries completed. Results:", results)
}

在这个示例中,我们假设使用PostgreSQL数据库(实际应用中可根据具体数据库进行调整)。queryDB 函数执行一个数据库查询,并将结果添加到 results 切片中。main 函数为每个查询启动一个 goroutine,并通过 WaitGroup 等待所有查询完成后输出结果。

WaitGroup 的原理

内部结构

WaitGroup 的实现位于Go标准库的 src/sync/waitgroup.go 文件中。其内部结构定义如下:

// WaitGroup is a synchronization primitive used to wait for a collection of goroutines to finish.
// The main use of a WaitGroup is to wait for all the goroutines launched by a piece of code to complete.
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 386 and arm
    // do not enforce it. The compiler will allocate 8 bytes anyway, so we
    // can just use the alignment.
    state1 [3]uint32
}

WaitGroup 内部使用一个 state1 数组来存储状态信息,其中高32位存储计数器的值,低32位存储等待的 goroutine 的数量。noCopy 字段用于防止 WaitGroup 被意外复制,因为复制 WaitGroup 可能会导致未定义行为。

Add 方法的实现

Add 方法的实现如下:

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if delta > 0 && v == int32(delta) {
        // The first increment after the counter was zero must not wake
        // any goroutines.  Avoids a race with Wait.
        return
    }
    if w != 0 && delta < 0 {
        // Decrementing waiters count shouldn't happen concurrently with
        // Wait.  This is useful for debugging, so we can detect
        // incorrect usage of WaitGroup and report a clearer error.
        panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    if v > 0 || w == 0 {
        return
    }
    // Counter is zero, and waiters are pending.
    // Reset waiters count to 0.
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Add 方法通过 atomic.AddUint64 原子操作来增加计数器的值。如果计数器变为0,并且有等待的 goroutine,则通过 runtime_Semrelease 释放所有等待的 goroutine。如果计数器变为负数,会触发 panic

Done 方法的实现

Done 方法实际上是 Add(-1) 的快捷方式,其实现如下:

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

这样,在 goroutine 完成任务后,通过调用 Done 方法就可以方便地减少计数器的值。

Wait 方法的实现

Wait 方法的实现如下:

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            // Counter is 0, no need to wait.
            return
        }
        // Increment waiters count.
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

Wait 方法通过 atomic.LoadUint64 原子操作加载当前的状态值。如果计数器为0,则直接返回。否则,通过 atomic.CompareAndSwapUint64 尝试增加等待的 goroutine 数量,然后通过 runtime_Semacquire 阻塞当前 goroutine,直到 WaitGroup 的计数器变为0,被 runtime_Semrelease 唤醒。

使用 WaitGroup 时的注意事项

避免重复使用未完成的 WaitGroup

Wait 方法返回之前,不应该再次调用 Add 方法来增加计数器的值,否则会导致 panic。例如:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine started")
    }()

    wg.Wait()
    // 这里再次调用 Add 是安全的,因为 Wait 已经返回
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Another goroutine started")
    }()

    wg.Wait()
}

如果在 Wait 方法返回之前调用 Add,如下所示:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine started")
    }()

    // 错误:在 Wait 之前再次调用 Add
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Another goroutine started")
    }()

    wg.Wait()
}

这样会触发 panic,因为在第一个 goroutine 还未完成时就尝试修改 WaitGroup 的状态。

确保所有 goroutine 都调用 Done

如果有 goroutine 忘记调用 Done 方法,Wait 方法将永远阻塞。例如:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(time.Second)
    // 错误:忘记调用 wg.Done()
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个例子中,worker 函数忘记调用 wg.Done(),导致 main 函数中的 wg.Wait() 永远阻塞,程序无法正常结束。

正确处理错误情况

goroutine 执行过程中,如果发生错误,应该在适当的地方调用 Done 方法,以确保 WaitGroup 的计数器能够正确递减。例如:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    if id == 2 {
        fmt.Printf("Worker %d encountered an error\n", id)
        return
    }
    fmt.Printf("Worker %d finished successfully\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个例子中,worker 函数在遇到错误时依然通过 defer wg.Done() 来减少计数器的值,保证 main 函数中的 wg.Wait() 能够正确等待所有 goroutine 完成。

通过深入理解 WaitGroup 的实用场景、原理以及注意事项,我们能够在Go语言的并发编程中更加高效地使用它,编写出健壮、可靠的并发程序。无论是并行任务处理、批量文件处理还是并发数据库操作等场景,WaitGroup 都为我们提供了一种简洁而强大的同步机制。同时,在使用过程中遵循正确的使用方法和注意事项,能够避免常见的错误,确保程序的稳定性和正确性。