MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

WaitGroup在并发控制中的核心作用

2022-12-044.0k 阅读

Go 语言并发编程基础

在深入探讨 WaitGroup 在并发控制中的核心作用之前,我们先来回顾一下 Go 语言并发编程的基础概念。Go 语言以其轻量级的线程模型——goroutine,为开发者提供了高效的并发编程能力。一个 goroutine 是一个与其他函数或方法并发运行的函数或方法。它类似于线程,但开销极小,可以轻松创建成千上万的 goroutine。

例如,下面是一个简单的创建和运行 goroutine 的示例:

package main

import (
    "fmt"
    "time"
)

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Println("Number:", i)
        time.Sleep(time.Millisecond * 500)
    }
}

func printLetters() {
    for i := 'a'; i <= 'e'; i++ {
        fmt.Println("Letter:", string(i))
        time.Sleep(time.Millisecond * 500)
    }
}

func main() {
    go printNumbers()
    go printLetters()

    time.Sleep(time.Second * 3)
}

在上述代码中,我们通过 go 关键字分别启动了两个 goroutine,printNumbersprintLetters。这两个 goroutine 会并发执行。然而,这种简单的并发模式存在一个问题,在 main 函数中,我们使用 time.Sleep 来等待两个 goroutine 执行完毕。但这种方式既不优雅也不可靠,因为我们很难精确地确定需要等待多长时间。如果睡眠时间设置过短,可能 goroutine 还未执行完就结束了程序;如果设置过长,则会浪费不必要的时间。这就引出了 WaitGroup 在并发控制中的重要性。

WaitGroup 概述

WaitGroup 是 Go 语言标准库 sync 包中的一个类型,它提供了一种机制,用于等待一组 goroutine 完成其任务。WaitGroup 内部维护一个计数器,通过调用 Add 方法增加计数器的值,调用 Done 方法减少计数器的值,调用 Wait 方法阻塞当前 goroutine,直到计数器的值变为零。

WaitGroup 的结构

在 Go 语言的源码中,WaitGroup 的定义如下:

// src/sync/waitgroup.go
type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

state1 是一个包含三个 uint32 类型元素的数组,它用来存储 WaitGroup 的状态,包括计数器的值和等待队列的信息。noCopy 是一个用于防止 WaitGroup 被复制的结构体,通过在 WaitGroup 中嵌入 noCopy 结构体,Go 编译器会在编译时检测到 WaitGroup 的复制操作,并报错,以避免因复制 WaitGroup 而导致的未定义行为。

WaitGroup 的方法

  1. Add(delta int):该方法用于增加 WaitGroup 计数器的值。delta 参数为增加的数值,可以是正数也可以是负数,但通常我们传入正数。如果传入负数,计数器的值会减少,但必须保证在调用 Wait 方法之前,计数器的值不会小于零,否则会导致运行时错误。
  2. Done():该方法是 Add(-1) 的快捷方式,用于将 WaitGroup 计数器的值减一。通常在一个 goroutine 完成其任务后调用此方法。
  3. Wait():该方法会阻塞调用它的 goroutine,直到 WaitGroup 计数器的值变为零。

WaitGroup 在并发控制中的核心作用

协调多个 goroutine 的完成

在实际的并发编程中,我们常常需要启动多个 goroutine 并行执行任务,然后等待所有 goroutine 都完成后再继续后续的操作。WaitGroup 提供了一种简洁而有效的方式来实现这一点。

例如,假设我们有一个任务,需要从多个数据源获取数据,然后对这些数据进行汇总分析。我们可以为每个数据源启动一个 goroutine 来获取数据,使用 WaitGroup 来等待所有数据获取完成后再进行汇总。

package main

import (
    "fmt"
    "sync"
)

func fetchData(source int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Fetching data from source %d\n", source)
    // 模拟数据获取操作
    // 这里可以是实际的网络请求或数据库查询等
    // 为了简单起见,我们使用下面的空循环来模拟
    for i := 0; i < 1000000000; i++ {
        continue
    }
    fmt.Printf("Data from source %d fetched\n", source)
}

func main() {
    var wg sync.WaitGroup
    numSources := 3

    for i := 1; i <= numSources; i++ {
        wg.Add(1)
        go fetchData(i, &wg)
    }

    wg.Wait()
    fmt.Println("All data fetched, starting analysis...")
    // 进行数据汇总分析的代码
}

在上述代码中,我们通过 for 循环启动了三个 goroutine 来从不同的数据源获取数据。每个 goroutine 在启动前,我们调用 wg.Add(1) 增加 WaitGroup 的计数器。在 fetchData 函数中,我们使用 defer wg.Done() 确保在 goroutine 结束时将计数器减一。最后,在 main 函数中调用 wg.Wait()main 函数会阻塞在这里,直到所有三个 goroutine 都调用了 wg.Done(),即所有数据都获取完成,然后才会继续执行后续的数据分析操作。

避免竞态条件

竞态条件是并发编程中常见的问题,当多个 goroutine 同时访问和修改共享资源时,可能会导致数据不一致或未定义行为。WaitGroup 虽然不能直接防止竞态条件,但它可以通过控制 goroutine 的执行顺序和等待所有相关 goroutine 完成,从而帮助我们更好地处理共享资源。

例如,考虑一个简单的计数器程序,多个 goroutine 同时对一个计数器进行累加操作。如果不进行适当的同步控制,就会出现竞态条件。

package main

import (
    "fmt"
    "sync"
)

var counter int

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        counter++
    }
}

func main() {
    var wg sync.WaitGroup
    numGoroutines := 10

    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go increment(&wg)
    }

    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在这个例子中,虽然 WaitGroup 本身并不能防止竞态条件,但它确保了所有的 increment goroutine 都完成了对 counter 的累加操作后,才输出最终的计数器值。如果没有 WaitGroupmain 函数可能在部分 goroutine 还未完成累加操作时就输出计数器值,导致结果不准确。

为了真正防止竞态条件,我们还需要使用其他同步机制,如互斥锁(sync.Mutex)。下面是改进后的代码:

package main

import (
    "fmt"
    "sync"
)

var counter int
var mu sync.Mutex

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        mu.Lock()
        counter++
        mu.Unlock()
    }
}

func main() {
    var wg sync.WaitGroup
    numGoroutines := 10

    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go increment(&wg)
    }

    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

通过使用 sync.Mutex,我们在每次对 counter 进行修改时,先获取锁,修改完成后再释放锁,从而避免了竞态条件。WaitGroupsync.Mutex 等同步机制结合使用,可以更有效地处理并发编程中的各种问题。

控制并发度

在某些场景下,我们可能不希望同时启动过多的 goroutine,以免消耗过多的系统资源。WaitGroup 可以与其他机制结合来控制并发度。

例如,假设我们需要下载一组图片,但为了避免对服务器造成过大压力,我们希望同时最多只能有三个下载任务在进行。我们可以使用一个带缓冲的通道来限制并发度,同时结合 WaitGroup 来等待所有下载任务完成。

package main

import (
    "fmt"
    "sync"
)

func downloadImage(url string, sem chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    sem <- struct{}{}
    defer func() { <-sem }()

    fmt.Printf("Downloading image from %s\n", url)
    // 模拟图片下载操作
    // 这里可以是实际的网络下载代码
    // 为了简单起见,我们使用下面的空循环来模拟
    for i := 0; i < 1000000000; i++ {
        continue
    }
    fmt.Printf("Image from %s downloaded\n", url)
}

func main() {
    var wg sync.WaitGroup
    urls := []string{
        "http://example.com/image1.jpg",
        "http://example.com/image2.jpg",
        "http://example.com/image3.jpg",
        "http://example.com/image4.jpg",
        "http://example.com/image5.jpg",
    }
    sem := make(chan struct{}, 3)

    for _, url := range urls {
        wg.Add(1)
        go downloadImage(url, sem, &wg)
    }

    wg.Wait()
    fmt.Println("All images downloaded")
}

在上述代码中,我们创建了一个带缓冲的通道 sem,其缓冲大小为 3,这意味着最多同时有三个 goroutine 可以进入下载操作。每个 downloadImage 函数在开始下载前,先向 sem 通道发送一个信号,下载完成后再从 sem 通道接收一个信号,这样就保证了同时进行的下载任务不会超过三个。WaitGroup 则用于等待所有的下载任务完成。

处理父子 goroutine 关系

在复杂的并发程序中,经常会出现一个 goroutine 启动多个子 goroutine 的情况。WaitGroup 可以很好地处理这种父子 goroutine 的关系,确保父 goroutine 等待所有子 goroutine 完成后再继续执行。

例如,假设我们有一个任务,需要遍历一个目录树,每个目录作为一个子任务由一个 goroutine 处理。主 goroutine 启动这些子 goroutine 后,需要等待所有子 goroutine 完成遍历才能结束。

package main

import (
    "fmt"
    "io/fs"
    "os"
    "path/filepath"
    "sync"
)

func walkDir(dir string, wg *sync.WaitGroup) {
    defer wg.Done()
    files, err := os.ReadDir(dir)
    if err != nil {
        fmt.Printf("Error reading directory %s: %v\n", dir, err)
        return
    }

    var subWG sync.WaitGroup
    for _, file := range files {
        if file.IsDir() {
            subDir := filepath.Join(dir, file.Name())
            subWG.Add(1)
            go walkDir(subDir, &subWG)
        }
    }

    subWG.Wait()
    fmt.Printf("Finished walking directory %s\n", dir)
}

func main() {
    rootDir := "."
    var wg sync.WaitGroup
    wg.Add(1)
    go walkDir(rootDir, &wg)
    wg.Wait()
    fmt.Println("All directories walked")
}

在这个例子中,walkDir 函数负责遍历一个目录。对于每个子目录,它启动一个新的 goroutine 来进行遍历,并使用一个内部的 WaitGroupsubWG)来等待所有子目录的遍历完成。主 goroutine 通过外部的 WaitGroupwg)等待整个目录树的遍历完成。这种方式可以有效地处理父子 goroutine 的层次结构,确保程序在所有相关任务完成后才结束。

WaitGroup 使用的注意事项

正确使用 Add 方法

在使用 Add 方法时,要确保增加的数值与后续调用 Done 方法减少的数值相匹配。如果调用 Wait 方法时计数器的值不为零,调用 Wait 的 goroutine 会一直阻塞。同时,要避免在计数器已经为零的情况下再次调用 Add 方法增加计数器的值,因为这可能会导致 Wait 方法永远不会返回。

例如,下面的代码就是一个错误的示例:

package main

import (
    "fmt"
    "sync"
)

func wrongUsage() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        fmt.Println("Goroutine is running")
        wg.Done()
    }()

    wg.Wait()
    // 错误:在计数器已经为零的情况下再次调用 Add
    wg.Add(1)
    fmt.Println("This line should not be reached")
}

在上述代码中,wg.Add(1)wg.Wait() 之后调用,这是不正确的,因为此时计数器已经为零,再次调用 Add 可能会导致后续逻辑出现问题。

避免在 goroutine 内部调用 Add

虽然在理论上可以在 goroutine 内部调用 Add 方法,但这可能会导致死锁。因为如果在一个 goroutine 中调用 Add 增加计数器的值,而这个 goroutine 又在等待 WaitGroup 计数器归零(例如通过调用 Wait 方法),就会出现死锁。

例如,下面是一个可能导致死锁的代码示例:

package main

import (
    "fmt"
    "sync"
)

func potentiallyDeadlock() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        // 错误:在 goroutine 内部调用 Add
        wg.Add(1)
        fmt.Println("Goroutine is running")
        wg.Done()
        wg.Wait()
    }()

    wg.Wait()
}

在这个例子中,内部 goroutine 调用 wg.Add(1) 增加计数器的值,同时又调用 wg.Wait() 等待计数器归零,这就导致了死锁。

小心使用 defer Done

在使用 defer wg.Done() 时要确保函数不会提前返回。如果函数在 defer wg.Done() 之前因为错误或其他原因提前返回,就会导致 WaitGroup 计数器没有正确减少,从而使 Wait 方法永远阻塞。

例如,下面是一个可能出现问题的代码示例:

package main

import (
    "fmt"
    "sync"
)

func mightNotCallDone(wg *sync.WaitGroup) {
    // 错误:可能不会调用 defer wg.Done()
    if someCondition() {
        return
    }
    defer wg.Done()
    fmt.Println("Function is doing some work")
}

func someCondition() bool {
    // 这里返回一个随机的布尔值,模拟可能的条件判断
    return true
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go mightNotCallDone(&wg)
    wg.Wait()
    fmt.Println("All work should be done")
}

在上述代码中,如果 someCondition() 返回 truemightNotCallDone 函数会提前返回,导致 defer wg.Done() 不会被执行,wg.Wait() 会永远阻塞。为了避免这种情况,应该在函数开始时就调用 defer wg.Done(),并在可能提前返回的地方进行适当的错误处理或逻辑调整。

总结

WaitGroup 是 Go 语言并发编程中一个非常重要的工具,它在协调多个 goroutine 的完成、避免竞态条件、控制并发度以及处理父子 goroutine 关系等方面都发挥着核心作用。正确使用 WaitGroup 可以使我们的并发程序更加健壮、高效。在使用 WaitGroup 时,需要注意正确调用其方法,避免常见的错误,如错误使用 Add 方法、在 goroutine 内部不当调用 Add 以及小心使用 defer Done 等。通过合理运用 WaitGroup 以及结合其他同步机制,我们能够编写出高质量的并发程序,充分发挥 Go 语言的并发优势。