MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go WaitGroup在复杂并发场景的灵活运用

2022-07-107.0k 阅读

Go WaitGroup基础概念

在Go语言的并发编程中,WaitGroup是一个非常重要的同步工具。它的主要作用是等待一组 goroutine 完成各自的任务。简单来说,WaitGroup 就像是一个计数器,通过它可以协调多个 goroutine 的执行,确保在所有相关的 goroutine 都完成工作之前,主程序不会提前退出。

WaitGroup的结构和原理

WaitGroup 结构体定义在Go标准库的 sync 包中,其内部包含一个计数器,该计数器记录着需要等待完成的 goroutine 数量。当我们启动一个新的 goroutine 时,可以调用 WaitGroupAdd 方法来增加计数器的值;当一个 goroutine 完成任务时,调用 Done 方法来减少计数器的值;而主程序(或者其他等待的 goroutine)可以调用 Wait 方法来阻塞,直到计数器的值变为 0,即所有相关的 goroutine 都已完成任务。

简单示例

以下是一个简单的示例,展示了如何使用 WaitGroup 来等待多个 goroutine 完成工作:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    // 模拟一些工作
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个示例中,我们创建了一个 WaitGroup 实例 wg。然后,我们通过循环启动了三个 goroutine,每个 goroutine 在启动前调用 wg.Add(1) 来增加计数器的值。在 worker 函数中,使用 defer wg.Done() 确保函数结束时计数器减 1。最后,在 main 函数中调用 wg.Wait(),主程序会阻塞在这里,直到所有三个 goroutine 都调用了 wg.Done(),计数器变为 0 才会继续执行,打印出 “All workers have finished”。

复杂并发场景下的运用

多阶段并发任务

在实际开发中,经常会遇到多阶段的并发任务。例如,一个任务可能分为数据获取、数据处理和结果存储三个阶段,每个阶段都可以并发执行,但必须按照顺序依次完成。

package main

import (
    "fmt"
    "sync"
)

func fetchData(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Fetching data for task %d\n", id)
    // 模拟数据获取操作
}

func processData(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Processing data for task %d\n", id)
    // 模拟数据处理操作
}

func storeData(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Storing data for task %d\n", id)
    // 模拟数据存储操作
}

func main() {
    var wg1, wg2, wg3 sync.WaitGroup
    numTasks := 3

    for i := 0; i < numTasks; i++ {
        wg1.Add(1)
        go fetchData(i, &wg1)
    }

    wg1.Wait()

    for i := 0; i < numTasks; i++ {
        wg2.Add(1)
        go processData(i, &wg2)
    }

    wg2.Wait()

    for i := 0; i < numTasks; i++ {
        wg3.Add(1)
        go storeData(i, &wg3)
    }

    wg3.Wait()

    fmt.Println("All tasks have been completed")
}

在这个示例中,我们使用了三个 WaitGroup 来分别管理数据获取、数据处理和结果存储三个阶段。首先,启动所有的数据获取 goroutine,通过 wg1.Wait() 等待它们全部完成。然后,启动数据处理 goroutine,等待 wg2 计数为 0 表示处理完成。最后,启动结果存储 goroutine,等待 wg3 计数为 0 表示存储完成。这样就实现了多阶段并发任务的有序执行。

嵌套并发任务

有时候,在一个 goroutine 内部还会启动多个子 goroutine,形成嵌套的并发结构。WaitGroup 在这种情况下同样可以发挥重要作用。

package main

import (
    "fmt"
    "sync"
)

func innerWorker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Inner worker %d starting\n", id)
    // 模拟一些工作
    fmt.Printf("Inner worker %d done\n", id)
}

func outerWorker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Outer worker %d starting\n", id)

    var innerWg sync.WaitGroup
    numInnerWorkers := 2

    for i := 0; i < numInnerWorkers; i++ {
        innerWg.Add(1)
        go innerWorker(i, &innerWg)
    }

    innerWg.Wait()
    fmt.Printf("Outer worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    numOuterWorkers := 3

    for i := 0; i < numOuterWorkers; i++ {
        wg.Add(1)
        go outerWorker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All outer workers have finished")
}

在这个例子中,outerWorker 函数代表外层 goroutine,它内部启动了多个 innerWorker 子 goroutine。每个 outerWorker 都有自己的 innerWg 来等待其内部的子 goroutine 完成。主程序通过 wg 来等待所有的 outerWorker 完成工作。这种嵌套结构在处理复杂的业务逻辑时非常常见,比如在一个批量处理任务中,每个任务又包含多个子任务。

动态增减 goroutine

在某些场景下,需要根据运行时的条件动态地启动或停止 goroutine。WaitGroup 也可以适应这种动态变化。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    for {
        select {
        case <-time.After(time.Second):
            fmt.Printf("Worker %d doing some work\n", id)
        }
    }
}

func main() {
    var wg sync.WaitGroup
    var numWorkers int

    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(i, &wg)
        numWorkers++
    }

    go func() {
        time.Sleep(3 * time.Second)
        for i := numWorkers; i < 5; i++ {
            wg.Add(1)
            go worker(i, &wg)
            numWorkers++
        }
    }()

    go func() {
        time.Sleep(5 * time.Second)
        // 这里假设可以通过某种方式通知某些 goroutine 停止
        // 例如通过 channel 发送信号
        fmt.Println("Stopping some workers")
        // 这里简单模拟减少两个 goroutine 的计数
        for i := 0; i < 2; i++ {
            wg.Done()
            numWorkers--
        }
    }()

    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个示例中,我们首先启动了三个 goroutine。然后,在一个单独的 goroutine 中,延迟 3 秒后又启动了两个新的 goroutine。接着,另一个 goroutine 在延迟 5 秒后模拟停止两个 goroutine(通过直接调用 wg.Done() 减少计数器)。虽然在实际应用中,停止 goroutine 通常需要更复杂的机制,比如通过 channel 发送停止信号,但这个示例展示了如何动态地增加和减少 WaitGroup 的计数,以适应动态的 goroutine 数量变化。

错误处理与 WaitGroup

在并发任务中,错误处理是一个关键问题。当某个 goroutine 发生错误时,我们可能需要及时停止其他正在运行的 goroutine,并将错误信息传递出来。

package main

import (
    "errors"
    "fmt"
    "sync"
)

var errSomeError = errors.New("some error occurred")

func worker(id int, wg *sync.WaitGroup, errChan chan error) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    // 模拟可能发生错误的操作
    if id == 1 {
        errChan <- errSomeError
        return
    }
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3
    errChan := make(chan error, 1)

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg, errChan)
    }

    go func() {
        wg.Wait()
        close(errChan)
    }()

    for err := range errChan {
        if err != nil {
            fmt.Println("Error:", err)
            // 这里可以根据需要停止其他 goroutine
            // 例如通过向一个停止信号 channel 发送信号
            break
        }
    }
    fmt.Println("All workers have been processed")
}

在这个示例中,每个 worker 函数如果发生错误(这里简单地以 id == 1 作为错误发生的条件),就会将错误信息发送到 errChan 中。主程序通过 for... rangeerrChan 中接收错误信息,如果接收到错误,就打印错误并可以根据需要停止其他正在运行的 goroutine。同时,通过一个单独的 goroutine 调用 wg.Wait() 并在完成后关闭 errChan,确保 for... range 循环能够正常结束。

资源池与 WaitGroup

在高并发场景下,资源池是一种常见的优化手段。例如,数据库连接池、HTTP 客户端连接池等。WaitGroup 可以与资源池结合,有效地管理资源的使用和释放。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Resource struct {
    // 这里可以定义资源相关的属性,比如数据库连接
}

func NewResource() *Resource {
    // 初始化资源,例如建立数据库连接
    return &Resource{}
}

func (r *Resource) Release() {
    // 释放资源,例如关闭数据库连接
}

type ResourcePool struct {
    resources chan *Resource
    wg        sync.WaitGroup
}

func NewResourcePool(capacity int) *ResourcePool {
    pool := &ResourcePool{
        resources: make(chan *Resource, capacity),
    }
    for i := 0; i < capacity; i++ {
        pool.resources <- NewResource()
    }
    return pool
}

func (p *ResourcePool) GetResource() *Resource {
    p.wg.Add(1)
    return <-p.resources
}

func (p *ResourcePool) ReleaseResource(r *Resource) {
    defer p.wg.Done()
    p.resources <- r
}

func worker(id int, pool *ResourcePool) {
    resource := pool.GetResource()
    defer pool.ReleaseResource(resource)

    fmt.Printf("Worker %d using resource\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d released resource\n", id)
}

func main() {
    pool := NewResourcePool(2)
    numWorkers := 5

    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, pool)
        }(i)
    }

    wg.Wait()
    pool.wg.Wait()
    close(pool.resources)
    fmt.Println("All workers have finished")
}

在这个示例中,我们定义了一个 ResourcePool 结构体来管理资源池。GetResource 方法从资源池中获取一个资源,并增加 WaitGroup 的计数,ReleaseResource 方法在使用完资源后将其放回资源池,并减少 WaitGroup 的计数。每个 worker 函数获取资源、使用资源,然后释放资源。主程序通过两个 WaitGroup,一个用于等待所有的 worker 完成工作,另一个用于等待所有的资源都被正确释放。这样可以确保在程序结束时,所有的资源都被妥善处理。

注意事项与优化

避免死锁

在使用 WaitGroup 时,最常见的问题之一就是死锁。死锁通常发生在以下几种情况:

  1. 忘记调用 Add 方法:如果在启动 goroutine 之前没有调用 Add 方法来增加计数器,而直接调用 Wait 方法,就会导致主程序永远阻塞,因为计数器永远不会达到 0。
package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Worker working")
}

func main() {
    var wg sync.WaitGroup
    go worker(&wg)
    wg.Wait() // 这里会导致死锁,因为没有调用 wg.Add(1)
    fmt.Println("All done")
}
  1. AddDone 不匹配:如果调用 Add 的次数多于 Done 的次数,Wait 方法也会永远阻塞。反之,如果 Done 的次数多于 Add 的次数,会导致运行时错误。
package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Worker working")
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2) // 增加两次计数
    go worker(&wg)
    wg.Wait() // 这里会导致死锁,因为只启动了一个 goroutine,只调用一次 wg.Done()
    fmt.Println("All done")
}
  1. 嵌套 WaitGroup 死锁:在嵌套使用 WaitGroup 时,如果处理不当,也可能导致死锁。例如,外层 WaitGroup 等待内层 WaitGroup,而内层 WaitGroup 又等待外层 WaitGroup,形成循环等待。
package main

import (
    "fmt"
    "sync"
)

func inner(wg *sync.WaitGroup, outerWg *sync.WaitGroup) {
    defer wg.Done()
    outerWg.Wait() // 内层等待外层
    fmt.Println("Inner working")
}

func outer(wg *sync.WaitGroup, innerWg *sync.WaitGroup) {
    defer wg.Done()
    innerWg.Wait() // 外层等待内层
    fmt.Println("Outer working")
}

func main() {
    var outerWg, innerWg sync.WaitGroup
    outerWg.Add(1)
    innerWg.Add(1)

    go inner(&innerWg, &outerWg)
    go outer(&outerWg, &innerWg)

    // 这里两个 goroutine 相互等待,导致死锁
}

性能优化

  1. 减少不必要的同步:虽然 WaitGroup 是一个强大的同步工具,但过多的同步操作会影响性能。在设计并发任务时,尽量减少对 WaitGroup 的依赖,只有在真正需要等待一组 goroutine 完成时才使用它。

  2. 批量操作 Add:如果需要启动大量的 goroutine,可以一次性调用 Add 方法增加相应的计数,而不是在每个 goroutine 启动前单独调用 Add。这样可以减少同步开销。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d working\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 1000

    wg.Add(numWorkers) // 一次性增加计数
    for i := 0; i < numWorkers; i++ {
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}
  1. 合理设置 WaitGroup 的作用域:确保 WaitGroup 的生命周期与需要等待的 goroutine 紧密相关。避免在不必要的地方长时间持有 WaitGroup,导致资源浪费或影响程序的可扩展性。

总结

WaitGroup 是Go语言并发编程中不可或缺的工具,在复杂的并发场景下,它能够帮助我们有效地协调多个 goroutine 的执行,实现多阶段任务、嵌套任务、动态增减 goroutine 等复杂功能。同时,我们也要注意避免死锁等常见问题,并通过合理的优化手段提高程序的性能。掌握 WaitGroup 的灵活运用,对于编写高效、健壮的Go并发程序至关重要。通过不断实践和深入理解,我们能够更好地利用Go语言的并发特性,开发出更强大的应用程序。