MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go WaitGroup的并发编程规范

2022-01-303.1k 阅读

Go WaitGroup简介

在Go语言的并发编程模型中,WaitGroup是一个非常重要的同步原语。它用于等待一组Go协程(goroutine)完成任务。WaitGroup的设计理念基于计数器,通过增加和减少计数器的值来跟踪协程的执行状态,当计数器的值归零,意味着所有相关的协程都已完成任务。

WaitGroup结构体定义在sync包中,使用时通常通过Add方法增加计数器的值,每个需要等待完成的协程开始时对应一次Add操作,协程结束时调用Done方法减少计数器的值,而主协程(或其他需要等待的协程)通过调用Wait方法阻塞,直到计数器的值变为零。

使用场景

  1. 多任务并行处理:当需要并行执行多个独立任务,并在所有任务完成后进行下一步操作时,WaitGroup就派上用场了。例如,从多个不同的数据源获取数据,然后对这些数据进行整合分析。
  2. 资源初始化与清理:在启动多个协程进行资源初始化时,使用WaitGroup确保所有资源初始化完成后再继续后续操作。同样,在程序结束时,通过WaitGroup等待所有资源清理协程完成,保证资源的正确释放。

基本用法

下面通过一个简单的代码示例来展示WaitGroup的基本使用:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

在上述代码中:

  • worker函数模拟一个工作协程,它接收一个idWaitGroup指针作为参数。在函数开始时通过defer wg.Done()确保函数结束时计数器减1。
  • main函数中,首先创建一个WaitGroup实例。然后通过循环启动numWorkers个协程,每次启动前调用wg.Add(1)增加计数器。最后调用wg.Wait()等待所有协程完成。

WaitGroup方法详解

  1. Add方法
    • 功能Add方法用于增加WaitGroup计数器的值。它接受一个整数参数delta,通常delta为1,表示增加一个需要等待的协程。如果delta为负数,则表示减少计数器的值,但这种情况需谨慎使用,因为可能导致计数器出现负数,引发未定义行为。
    • 使用注意事项Add方法应该在启动协程之前调用,确保计数器正确反映需要等待的协程数量。如果在协程已经开始执行后调用Add,可能会导致竞态条件,因为协程可能在Add调用之前就完成并调用Done,使得计数器的值不准确。
    • 示例
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    // 启动5个协程
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("协程 %d 开始工作\n", id)
            time.Sleep(time.Second)
            fmt.Printf("协程 %d 工作完成\n", id)
        }(i)
    }
    wg.Wait()
    fmt.Println("所有协程已完成")
}
  1. Done方法
    • 功能Done方法是Add(-1)的便捷形式,用于减少WaitGroup计数器的值,通常在协程完成任务时调用。它内部实际调用的是Add(-1),但这种形式更简洁且语义更清晰。
    • 使用注意事项:每个调用Add方法的协程都必须在完成任务时调用Done方法,否则WaitGroup的计数器将永远不会归零,调用Wait方法的协程将一直阻塞。如果在没有调用Add的情况下调用Done,会导致计数器出现负数,同样引发未定义行为。
    • 示例
package main

import (
    "fmt"
    "sync"
    "time"
)

func task(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("任务开始执行")
    time.Sleep(2 * time.Second)
    fmt.Println("任务执行完成")
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go task(&wg)
    wg.Wait()
    fmt.Println("所有任务已完成")
}
  1. Wait方法
    • 功能Wait方法用于阻塞当前协程,直到WaitGroup的计数器值变为零。当计数器归零,意味着所有调用Add方法对应的协程都已调用Done方法,即所有相关协程都已完成任务,此时Wait方法返回,程序继续执行后续代码。
    • 使用注意事项Wait方法应该在所有需要等待的协程启动并调用Add之后调用。如果在调用Add之前调用Wait,由于计数器初始值为零,Wait方法会立即返回,可能达不到预期的等待效果。
    • 示例
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("协程 %d 开始\n", id)
            time.Sleep(time.Second)
            fmt.Printf("协程 %d 结束\n", id)
        }(i)
    }
    fmt.Println("等待所有协程完成")
    wg.Wait()
    fmt.Println("所有协程已完成")
}

嵌套使用WaitGroup

在复杂的并发场景中,可能会遇到需要等待一组嵌套的协程完成的情况。这时候可以嵌套使用WaitGroup来实现。例如,有一个主任务,它启动多个子任务,每个子任务又启动多个孙任务。

package main

import (
    "fmt"
    "sync"
    "time"
)

func grandChildTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("孙任务 %d 开始\n", id)
    time.Sleep(500 * time.Millisecond)
    fmt.Printf("孙任务 %d 结束\n", id)
}

func childTask(id int, wg *sync.WaitGroup) {
    var grandChildWG sync.WaitGroup
    for i := 0; i < 2; i++ {
        grandChildWG.Add(1)
        go grandChildTask(i, &grandChildWG)
    }
    grandChildWG.Wait()
    fmt.Printf("子任务 %d 完成\n", id)
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go childTask(i, &wg)
    }
    wg.Wait()
    fmt.Println("所有任务已完成")
}

在上述代码中:

  • grandChildTask模拟孙任务,childTask模拟子任务,每个childTask启动两个grandChildTask
  • childTask内部创建一个新的WaitGroupgrandChildWG)来等待其启动的孙任务完成,完成后再调用外部传入的wg.Done()表示自己完成。
  • main函数通过wg.Wait()等待所有childTask完成。

错误处理与异常情况

  1. 计数器负数问题:如前文所述,在调用Add方法增加计数器值之前调用Done方法,或者调用Add时传入负数,都可能导致计数器变为负数。这会引发未定义行为,Wait方法的阻塞可能永远不会解除,或者程序可能崩溃。因此,务必确保在协程开始前调用Add,并且不要在没有相应Add的情况下调用Done
  2. 竞态条件:在并发环境中,如果对WaitGroup的操作没有正确同步,可能会出现竞态条件。例如,多个协程同时调用AddDone,可能导致计数器的值不准确。虽然WaitGroup内部对计数器的操作是原子性的,但在使用过程中仍需注意整体逻辑的正确性,确保在合适的时机调用相应方法。
  3. 泄漏问题:如果启动了协程并调用了Add,但由于某些原因(如协程内部发生恐慌或提前返回)没有调用Done,那么WaitGroup的计数器将不会归零,调用Wait的协程将永远阻塞,这就导致了资源泄漏。为了避免这种情况,在协程内部使用defer语句调用Done方法是一个良好的实践,确保无论协程以何种方式结束,Done方法都会被调用。

与其他同步原语结合使用

  1. Mutex:在需要保护共享资源时,WaitGroup通常与Mutex结合使用。例如,多个协程可能需要访问并修改同一个数据结构,此时可以使用Mutex来保证数据的一致性,同时使用WaitGroup来等待所有协程完成操作。
package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int
    mu    sync.Mutex
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func (c *Counter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func worker(c *Counter, wg *sync.WaitGroup) {
    defer wg.Done()
    c.Increment()
}

func main() {
    var wg sync.WaitGroup
    counter := Counter{}
    numWorkers := 10

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(&counter, &wg)
    }

    wg.Wait()
    fmt.Printf("Counter value: %d\n", counter.Get())
}

在上述代码中,Counter结构体使用Mutex来保护value字段的读写操作,worker协程通过WaitGroup来同步,确保所有协程完成对计数器的操作后再输出最终结果。

  1. ChannelWaitGroupChannel也经常一起使用。Channel可以用于在协程之间传递数据,而WaitGroup可以用于等待所有涉及数据处理的协程完成。例如,从一个Channel中读取数据并进行处理,当所有数据处理完成后,通过WaitGroup通知主协程。
package main

import (
    "fmt"
    "sync"
)

func dataProcessor(dataChan <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for data := range dataChan {
        fmt.Printf("处理数据: %d\n", data)
    }
}

func main() {
    var wg sync.WaitGroup
    dataChan := make(chan int)

    wg.Add(1)
    go dataProcessor(dataChan, &wg)

    for i := 0; i < 5; i++ {
        dataChan <- i
    }
    close(dataChan)

    wg.Wait()
    fmt.Println("所有数据处理完成")
}

在这个示例中,dataProcessor协程从dataChan中读取数据并处理,主协程向dataChan发送数据后关闭Channel,通过WaitGroup等待dataProcessor协程处理完所有数据。

性能优化

  1. 减少不必要的等待:在设计并发程序时,应尽量避免让WaitGroup等待过长时间。如果某些协程的任务可以并行执行,并且对最终结果没有先后顺序要求,应将它们并行化,而不是依次启动并等待。例如,如果有多个文件需要读取,可以并行启动多个协程进行读取,而不是逐个读取。
  2. 避免过度同步:虽然WaitGroup是一个强大的同步工具,但过度使用同步机制会降低程序的并发性能。确保只在必要时使用WaitGroup进行同步,对于一些独立的任务,尽量让它们在没有同步开销的情况下并行执行。
  3. 优化协程数量:根据系统资源和任务特性合理设置协程数量。过多的协程会增加系统调度开销,而过少的协程则无法充分利用系统资源。可以通过性能测试和分析来确定最优的协程数量。例如,在CPU密集型任务中,协程数量可能接近CPU核心数;而在I/O密集型任务中,可以适当增加协程数量以提高I/O利用率。

总结

WaitGroup是Go语言并发编程中不可或缺的同步原语,通过正确使用它,可以有效地协调多个协程的执行,确保程序按照预期的逻辑运行。在使用过程中,需要注意AddDoneWait方法的正确调用顺序,避免出现计数器负数、竞态条件和资源泄漏等问题。同时,结合其他同步原语如MutexChannel,以及优化并发性能的策略,可以构建出高效、稳定的并发程序。在实际开发中,要根据具体的业务需求和场景,灵活运用WaitGroup及其相关技术,充分发挥Go语言并发编程的优势。