MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go WaitGroup在复杂并发场景的应用

2022-05-253.7k 阅读

Go WaitGroup 基础概念

在 Go 语言的并发编程中,WaitGroup 是一个非常重要的同步工具。它的作用类似于一个计数器,用于等待一组 goroutine 完成它们的工作。WaitGroup 类型定义在 sync 包中,其结构体定义如下:

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

这个结构体的具体实现细节对于使用 WaitGroup 来说并不重要,但了解它的存在可以让我们知道 WaitGroup 内部是有状态的。

WaitGroup 主要有三个方法:AddDoneWait

  1. Add 方法:用于向 WaitGroup 计数器添加指定的增量。例如,如果我们要启动 3 个 goroutine 并等待它们完成,我们可以调用 wg.Add(3),这里 wg 是一个 WaitGroup 实例。Add 方法可以在任何时候调用,但通常在启动 goroutine 之前调用。如果在 Wait 方法已经开始等待后调用 Add,可能会导致程序死锁,因为等待的 goroutine 已经在等待计数器归零,而新增加的计数不会被等待的 goroutine 知晓。
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("goroutine is running")
    }()
    wg.Wait()
    fmt.Println("all goroutines are done")
}

在这个例子中,我们创建了一个 WaitGroup 实例 wg,通过 wg.Add(1) 增加计数,然后启动一个 goroutine。在 goroutine 中,我们使用 defer wg.Done() 来表示该 goroutine 工作完成,最后在主 goroutine 中调用 wg.Wait() 等待所有 goroutine 完成。

  1. Done 方法:其实它是 Add(-1) 的便捷写法,用于将 WaitGroup 计数器减 1。通常在 goroutine 的末尾调用 Done 方法,表示该 goroutine 已经完成了它的工作。它一般和 defer 关键字一起使用,这样即使 goroutine 发生 panic,计数器也能正确减 1。
  2. Wait 方法:该方法会阻塞调用它的 goroutine,直到 WaitGroup 计数器归零。也就是说,只有当所有通过 Add 方法增加的计数都通过 Done 方法减少到 0 时,Wait 才会返回,继续执行后续代码。

简单并发场景下的 WaitGroup

在简单的并发场景中,WaitGroup 的使用非常直观。例如,我们有多个独立的任务需要并发执行,然后等待所有任务完成后再进行下一步操作。

package main

import (
    "fmt"
    "sync"
    "time"
)

func task(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Task %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Task %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numTasks := 3

    for i := 1; i <= numTasks; i++ {
        wg.Add(1)
        go task(i, &wg)
    }

    wg.Wait()
    fmt.Println("All tasks are completed")
}

在这个示例中,我们定义了一个 task 函数,每个 task 模拟了一个需要花费 1 秒时间完成的工作。在 main 函数中,我们通过循环启动 3 个 goroutine 来执行 task 函数,并使用 WaitGroup 来等待所有任务完成。这种场景下,WaitGroup 简单直接地帮助我们实现了并发任务的同步。

复杂并发场景下 WaitGroup 的应用

  1. 层级式并发任务 在一些复杂的业务场景中,任务可能呈现层级结构。例如,我们有一个主任务,它启动多个子任务,而每个子任务又可能启动更多的孙任务。
package main

import (
    "fmt"
    "sync"
    "time"
)

func subTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Sub - task %d started\n", id)
    var subWg sync.WaitGroup
    subWg.Add(2)

    go func() {
        defer subWg.Done()
        fmt.Printf("Grand - sub - task 1 of sub - task %d started\n", id)
        time.Sleep(time.Second)
        fmt.Printf("Grand - sub - task 1 of sub - task %d finished\n", id)
    }()

    go func() {
        defer subWg.Done()
        fmt.Printf("Grand - sub - task 2 of sub - task %d started\n", id)
        time.Sleep(time.Second)
        fmt.Printf("Grand - sub - task 2 of sub - task %d finished\n", id)
    }()

    subWg.Wait()
    fmt.Printf("Sub - task %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numSubTasks := 3

    for i := 1; i <= numSubTasks; i++ {
        wg.Add(1)
        go subTask(i, &wg)
    }

    wg.Wait()
    fmt.Println("All tasks are completed")
}

在这个例子中,subTask 函数模拟了子任务,每个子任务又启动了两个孙任务。我们通过在子任务内部创建新的 WaitGroupsubWg)来等待孙任务完成,同时在主任务中使用 wg 来等待所有子任务完成。这样,通过层级式地使用 WaitGroup,我们可以很好地管理复杂的层级并发任务。

  1. 并发任务依赖关系处理 有时候,并发任务之间存在依赖关系。例如,任务 B 需要在任务 A 完成后才能开始。我们可以使用 WaitGroup 结合通道(channel)来实现这种依赖关系。
package main

import (
    "fmt"
    "sync"
    "time"
)

func taskA(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Task A started")
    time.Sleep(time.Second)
    fmt.Println("Task A finished")
}

func taskB(wg *sync.WaitGroup, signal chan struct{}) {
    defer wg.Done()
    <-signal
    fmt.Println("Task B started")
    time.Sleep(time.Second)
    fmt.Println("Task B finished")
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)

    signal := make(chan struct{})

    go taskA(&wg)
    go taskB(&wg, signal)

    go func() {
        wg.Wait()
        close(signal)
    }()
}

在这个示例中,taskB 依赖于 taskA 的完成。我们通过一个无缓冲通道 signal 来传递任务 A 完成的信号。在 main 函数中,我们启动 taskAtaskB,同时使用一个匿名 goroutine 来等待 taskAtaskB 完成(通过 wg.Wait()),然后关闭通道 signal,这样 taskB 就可以接收到信号并开始执行。

  1. 动态创建和等待 goroutine 在某些场景下,我们可能需要根据运行时的条件动态地创建和等待 goroutine。
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    duration := time.Duration(rand.Intn(3)) * time.Second
    fmt.Printf("Worker %d started, will run for %v\n", id, duration)
    time.Sleep(duration)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := rand.Intn(5) + 1

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers are done")
}

在这个例子中,我们使用 rand.Intn(5) + 1 动态生成需要启动的 goroutine 数量。每个 worker 函数模拟一个随机执行时间的任务。通过 WaitGroup,我们可以方便地等待所有动态创建的 goroutine 完成。

  1. 错误处理与 WaitGroup 在实际应用中,并发任务可能会出错。我们可以结合 WaitGroup 和错误处理机制来确保程序的健壮性。
package main

import (
    "errors"
    "fmt"
    "sync"
    "time"
)

var errTaskFailed = errors.New("task failed")

func task(id int, wg *sync.WaitGroup, errChan chan error) {
    defer wg.Done()
    if id == 2 {
        errChan <- errTaskFailed
        return
    }
    fmt.Printf("Task %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Task %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numTasks := 3
    errChan := make(chan error, numTasks)

    for i := 1; i <= numTasks; i++ {
        wg.Add(1)
        go task(i, &wg, errChan)
    }

    go func() {
        wg.Wait()
        close(errChan)
    }()

    for err := range errChan {
        if err != nil {
            fmt.Printf("Error: %v\n", err)
        }
    }

    fmt.Println("All tasks processed")
}

在这个示例中,task 函数可能会返回错误。我们通过一个 errChan 通道来传递错误信息。在 main 函数中,我们启动多个任务,并使用 WaitGroup 等待所有任务完成。当所有任务完成后,我们关闭 errChan 通道,并通过 for... range 循环读取通道中的错误信息进行处理。

WaitGroup 使用中的注意事项

  1. 避免重复等待 不要在同一个 WaitGroup 上多次调用 Wait 方法,除非你明确知道自己在做什么。例如,在一个函数中调用 wg.Wait() 后,又在另一个地方再次调用 wg.Wait(),这可能会导致意外的阻塞行为,因为第一次 Wait 已经阻塞到计数器归零,第二次调用可能会永远阻塞。
  2. 正确使用 Add 和 Done 确保 AddDone 的调用次数匹配。如果 Add 的次数多于 DoneWait 方法将永远阻塞;反之,如果 Done 的次数多于 Add,可能会导致运行时恐慌(panic)。
  3. 防止死锁 在并发编程中,死锁是一个常见的问题。当在 Wait 方法已经开始等待后调用 Add,或者 Add 的次数与 Done 不匹配导致 Wait 永远阻塞时,都可能发生死锁。仔细设计并发逻辑,确保 WaitGroup 的使用正确无误可以避免死锁。

与其他同步工具的结合使用

  1. WaitGroup 与 Mutex 在一些场景下,我们可能需要保护共享资源,同时等待一组 goroutine 完成。这时可以结合 WaitGroupMutex
package main

import (
    "fmt"
    "sync"
)

type Data struct {
    value int
    mu    sync.Mutex
}

func worker(id int, data *Data, wg *sync.WaitGroup) {
    defer wg.Done()
    data.mu.Lock()
    data.value += id
    data.mu.Unlock()
    fmt.Printf("Worker %d updated data value to %d\n", id, data.value)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3
    data := Data{}

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &data, &wg)
    }

    wg.Wait()
    fmt.Printf("Final data value: %d\n", data.value)
}

在这个例子中,多个 worker goroutine 需要修改共享的 Data 结构体中的 value 字段。我们使用 Mutex 来保护对 value 的并发访问,同时使用 WaitGroup 等待所有 worker 完成任务。

  1. WaitGroup 与 Channel 我们前面已经看到了一些结合 WaitGroup 和通道的示例。通道可以用于在 goroutine 之间传递数据和信号,而 WaitGroup 用于等待一组 goroutine 完成。它们的结合可以实现非常复杂的并发控制逻辑。例如,我们可以使用通道来分发任务,然后使用 WaitGroup 等待所有任务处理完成。
package main

import (
    "fmt"
    "sync"
)

func worker(taskChan <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range taskChan {
        fmt.Printf("Processing task %d\n", task)
    }
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3
    taskChan := make(chan int)

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(taskChan, &wg)
    }

    for i := 1; i <= 5; i++ {
        taskChan <- i
    }
    close(taskChan)

    wg.Wait()
    fmt.Println("All tasks are processed")
}

在这个示例中,我们通过 taskChan 通道向多个 worker goroutine 分发任务。每个 worker 从通道中读取任务并处理,直到通道关闭。WaitGroup 用于等待所有 worker 完成任务。

通过以上详细的介绍和丰富的代码示例,我们深入了解了 Go 语言中 WaitGroup 在复杂并发场景下的应用。掌握 WaitGroup 的正确使用方法,结合其他同步工具,可以帮助我们编写高效、健壮的并发程序。无论是层级式任务、任务依赖关系处理,还是动态创建 goroutine 等场景,WaitGroup 都能发挥重要作用,同时在使用过程中注意避免常见的问题,如死锁、重复等待等,确保程序的正确性和稳定性。