MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

go 中 WaitGroup 的应用与实例

2024-02-176.7k 阅读

Go 语言并发编程基础

在深入探讨 WaitGroup 之前,我们先来回顾一下 Go 语言并发编程的一些基础知识。Go 语言在设计之初就将并发编程作为其核心特性之一,通过 goroutinechannel 提供了简洁而高效的并发编程模型。

goroutine

goroutine 是 Go 语言中实现并发的轻量级线程。与操作系统线程相比,goroutine 的创建和销毁成本极低,使得我们可以轻松创建数以万计的并发任务。启动一个 goroutine 非常简单,只需要在函数调用前加上 go 关键字即可。例如:

package main

import (
    "fmt"
    "time"
)

func worker(id int) {
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    for i := 1; i <= 3; i++ {
        go worker(i)
    }
    time.Sleep(2 * time.Second)
    fmt.Println("Main function exiting")
}

在上述代码中,main 函数通过循环启动了 3 个 goroutine 来执行 worker 函数。每个 worker 函数模拟了一个耗时任务,通过 time.Sleep 暂停 1 秒。main 函数在启动完所有 goroutine 后,通过 time.Sleep(2 * time.Second) 等待一段时间,以确保所有 goroutine 有足够时间执行完毕,最后输出 "Main function exiting"。

channel

channel 是 Go 语言中用于在 goroutine 之间进行通信和同步的机制。它可以被看作是一个类型安全的管道,数据可以从一端发送,从另一端接收。创建一个 channel 的语法如下:

ch := make(chan int)

这里创建了一个可以传输 int 类型数据的 channel。向 channel 发送数据使用 <- 操作符:

ch <- 10

channel 接收数据也使用 <- 操作符:

data := <-ch

下面是一个简单的 channel 示例,展示了两个 goroutine 之间通过 channel 进行数据传递:

package main

import (
    "fmt"
)

func sender(ch chan int) {
    for i := 1; i <= 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiver(ch chan int) {
    for data := range ch {
        fmt.Printf("Received: %d\n", data)
    }
}

func main() {
    ch := make(chan int)
    go sender(ch)
    go receiver(ch)
    select {}
}

在这个示例中,sender goroutinechannel 中发送 1 到 5 的整数,然后通过 close(ch) 关闭 channelreceiver goroutine 使用 for... range 循环从 channel 中接收数据,直到 channel 关闭。main 函数中的 select {} 语句是为了防止 main 函数提前退出,因为 main 函数是整个程序的初始 goroutine,一旦 main 函数返回,程序就会结束。

WaitGroup 概述

WaitGroup 是 Go 语言标准库 sync 包中的一个类型,用于协调多个 goroutine 的同步。它提供了一种简单的方式来等待一组 goroutine 全部完成任务。WaitGroup 内部维护了一个计数器,通过 Add 方法增加计数器的值,通过 Done 方法减少计数器的值,通过 Wait 方法阻塞当前 goroutine,直到计数器的值变为 0。

WaitGroup 的结构

WaitGroup 的结构在 sync 包的源码中定义如下:

// src/sync/waitgroup.go
type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

state1 字段是一个包含 3 个 uint32 类型的数组,用于存储计数器的值和等待队列的信息。虽然我们通常不需要直接操作这个结构,但了解其大致构成有助于理解 WaitGroup 的工作原理。

WaitGroup 的方法

  1. Add(delta int):将计数器增加 delta。如果 delta 为负数,会导致 panic。通常在启动 goroutine 之前调用 Add 方法,传入需要等待的 goroutine 的数量。
  2. Done():将计数器减 1,等同于 Add(-1)。一般在 goroutine 完成任务后调用。
  3. Wait():阻塞当前 goroutine,直到计数器的值变为 0。

WaitGroup 的基本应用

简单示例:等待多个 goroutine 完成

下面我们通过一个简单的示例来展示如何使用 WaitGroup 等待多个 goroutine 完成任务。假设我们有一组任务,每个任务模拟一个耗时操作,我们希望在所有任务完成后再继续执行主程序。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3
    wg.Add(numWorkers)

    for i := 1; i <= numWorkers; i++ {
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个示例中:

  1. 我们在 main 函数中创建了一个 WaitGroup 实例 wg
  2. 使用 wg.Add(numWorkers) 将计数器设置为需要等待的 goroutine 的数量,这里是 3。
  3. 通过循环启动 3 个 goroutine,每个 goroutine 执行 worker 函数,并将 wg 的指针传递给 worker 函数。
  4. worker 函数中,通过 defer wg.Done() 确保在函数结束时将计数器减 1。
  5. main 函数调用 wg.Wait() 阻塞,直到所有 goroutine 完成任务,计数器变为 0,然后输出 "All workers have finished"。

避免在 goroutine 中调用 Add

在使用 WaitGroup 时,有一个重要的注意事项:不要在 goroutine 内部调用 Add 方法。这是因为 Add 方法可能会导致 WaitGroup 的状态发生变化,而在 goroutine 中调用可能会引发竞态条件。例如:

package main

import (
    "fmt"
    "sync"
    "time"
)

func badWorker(wg *sync.WaitGroup) {
    wg.Add(1)
    fmt.Println("Bad worker started")
    time.Sleep(time.Second)
    fmt.Println("Bad worker finished")
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    go badWorker(&wg)
    wg.Wait()
    fmt.Println("Main function exiting")
}

在上述代码中,badWorker goroutine 内部调用了 wg.Add(1),这可能会导致在 main 函数调用 wg.Wait() 时,计数器还未正确设置,从而引发未定义行为。正确的做法是在启动 goroutine 之前就设置好计数器的值。

WaitGroup 在复杂场景中的应用

分组等待多个 goroutine 组

在实际应用中,我们可能需要管理多个不同的 goroutine 组,每个组有不同的任务,并且我们希望能够分别等待每个组的任务完成。WaitGroup 可以很方便地实现这一点。下面是一个示例,展示了如何等待两个不同组的 goroutine 完成:

package main

import (
    "fmt"
    "sync"
    "time"
)

func group1Worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Group1 Worker %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Group1 Worker %d finished\n", id)
}

func group2Worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Group2 Worker %d started\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Group2 Worker %d finished\n", id)
}

func main() {
    var wg1, wg2 sync.WaitGroup
    numGroup1Workers := 2
    numGroup2Workers := 3

    wg1.Add(numGroup1Workers)
    wg2.Add(numGroup2Workers)

    for i := 1; i <= numGroup1Workers; i++ {
        go group1Worker(i, &wg1)
    }

    for i := 1; i <= numGroup2Workers; i++ {
        go group2Worker(i, &wg2)
    }

    fmt.Println("Waiting for Group1 to finish")
    wg1.Wait()
    fmt.Println("Group1 has finished")

    fmt.Println("Waiting for Group2 to finish")
    wg2.Wait()
    fmt.Println("Group2 has finished")

    fmt.Println("All groups have finished")
}

在这个示例中:

  1. 我们创建了两个 WaitGroup 实例 wg1wg2,分别用于管理两个不同组的 goroutine
  2. 为每个组的 goroutine 数量设置相应的计数器值。
  3. 分别启动两个组的 goroutine,每个 goroutine 在完成任务时调用对应的 wg.Done()
  4. 通过分别调用 wg1.Wait()wg2.Wait() 等待两个组的任务依次完成,并输出相应的提示信息。

实现任务流水线

WaitGroup 还可以用于实现任务流水线,即一个任务的输出作为下一个任务的输入,并且需要等待每个阶段的任务全部完成。下面是一个简单的任务流水线示例,模拟数据处理的三个阶段:读取数据、处理数据和输出数据。

package main

import (
    "fmt"
    "sync"
)

func readData(data chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 5; i++ {
        data <- i
    }
    close(data)
}

func processData(input <-chan int, output chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for num := range input {
        output <- num * num
    }
    close(output)
}

func writeData(input <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for result := range input {
        fmt.Printf("Processed result: %d\n", result)
    }
}

func main() {
    var wg sync.WaitGroup
    dataCh := make(chan int)
    processedCh := make(chan int)

    wg.Add(3)
    go readData(dataCh, &wg)
    go processData(dataCh, processedCh, &wg)
    go writeData(processedCh, &wg)

    wg.Wait()
    fmt.Println("All tasks in the pipeline have finished")
}

在这个示例中:

  1. readData 函数将数据写入 dataCh 通道,完成后关闭通道,并调用 wg.Done()
  2. processData 函数从 dataCh 通道读取数据,进行平方处理后写入 processedCh 通道,完成后关闭通道,并调用 wg.Done()
  3. writeData 函数从 processedCh 通道读取处理后的数据并输出,完成后调用 wg.Done()
  4. main 函数通过 wg.Wait() 等待所有三个阶段的任务完成。

WaitGroup 与其他同步机制的结合使用

WaitGroup 与 Mutex

在并发编程中,我们经常需要保护共享资源,防止多个 goroutine 同时访问导致数据竞争。Mutex(互斥锁)是 Go 语言中用于实现互斥访问的机制。WaitGroup 可以与 Mutex 结合使用,确保在多个 goroutine 访问共享资源时的安全性。下面是一个示例,展示了多个 goroutine 对共享资源的安全访问:

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int
    mu    sync.Mutex
}

func (c *Counter) Increment(wg *sync.WaitGroup) {
    defer wg.Done()
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    counter := Counter{}
    numGoroutines := 10

    wg.Add(numGoroutines)
    for i := 0; i < numGoroutines; i++ {
        go counter.Increment(&wg)
    }

    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.value)
}

在这个示例中:

  1. Counter 结构体包含一个 value 字段用于存储计数值,以及一个 sync.Mutex 类型的 mu 字段用于保护 value
  2. Increment 方法在增加 value 之前通过 c.mu.Lock() 锁定互斥锁,增加完成后通过 c.mu.Unlock() 解锁,确保同一时间只有一个 goroutine 可以访问 value
  3. main 函数启动 10 个 goroutine 调用 Increment 方法,并通过 WaitGroup 等待所有 goroutine 完成,最后输出最终的计数值。

WaitGroup 与 Channel

WaitGroupchannel 都是 Go 语言中重要的并发编程工具,它们可以相互配合实现更复杂的同步和通信需求。例如,我们可以使用 channel 来通知 goroutine 任务的开始和结束,同时使用 WaitGroup 来等待所有相关 goroutine 完成。下面是一个示例,展示了如何通过 channelWaitGroup 实现任务的有序启动和结束:

package main

import (
    "fmt"
    "sync"
    "time"
)

func task(id int, startCh <-chan struct{}, doneCh chan<- struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    <-startCh
    fmt.Printf("Task %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Task %d finished\n", id)
    doneCh <- struct{}{}
}

func main() {
    var wg sync.WaitGroup
    numTasks := 3
    startCh := make(chan struct{})
    doneCh := make(chan struct{})

    wg.Add(numTasks)
    for i := 1; i <= numTasks; i++ {
        go task(i, startCh, doneCh, &wg)
    }

    fmt.Println("Starting all tasks")
    close(startCh)

    for i := 0; i < numTasks; i++ {
        <-doneCh
    }
    close(doneCh)

    wg.Wait()
    fmt.Println("All tasks have finished")
}

在这个示例中:

  1. task 函数通过 startCh 等待任务开始信号,接收到信号后开始执行任务,完成后通过 doneCh 发送完成信号,并调用 wg.Done()
  2. main 函数启动 3 个 goroutine 执行 task 函数,并通过 WaitGroup 等待它们完成。
  3. 通过关闭 startCh 通道来通知所有 goroutine 开始任务,然后通过循环从 doneCh 通道接收完成信号,确保所有任务都已完成,最后关闭 doneCh 通道。

WaitGroup 的性能考虑

减少不必要的等待

在使用 WaitGroup 时,要尽量减少不必要的等待时间。例如,如果有一些 goroutine 的任务执行时间较短,而另一些较长,我们可以考虑将短任务和长任务分开处理,避免短任务长时间等待长任务。下面是一个优化示例,展示了如何将任务按照执行时间长短进行分组处理:

package main

import (
    "fmt"
    "sync"
    "time"
)

func shortTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Short Task %d started\n", id)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Short Task %d finished\n", id)
}

func longTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Long Task %d started\n", id)
    time.Sleep(1 * time.Second)
    fmt.Printf("Long Task %d finished\n", id)
}

func main() {
    var shortWG, longWG sync.WaitGroup
    numShortTasks := 3
    numLongTasks := 2

    shortWG.Add(numShortTasks)
    longWG.Add(numLongTasks)

    for i := 1; i <= numShortTasks; i++ {
        go shortTask(i, &shortWG)
    }

    for i := 1; i <= numLongTasks; i++ {
        go longTask(i, &longWG)
    }

    fmt.Println("Waiting for short tasks to finish")
    shortWG.Wait()
    fmt.Println("Short tasks have finished")

    fmt.Println("Waiting for long tasks to finish")
    longWG.Wait()
    fmt.Println("Long tasks have finished")

    fmt.Println("All tasks have finished")
}

在这个示例中,我们将任务分为短任务和长任务两组,分别使用 shortWGlongWG 进行等待。这样,短任务完成后可以及时继续执行后续逻辑,而不需要等待长任务。

避免过度使用 WaitGroup

虽然 WaitGroup 是一个强大的同步工具,但过度使用可能会导致代码的可读性和性能下降。例如,如果在一个复杂的并发系统中,每个小的子任务都使用 WaitGroup 进行同步,可能会导致代码结构变得混乱,并且增加了不必要的同步开销。在设计并发程序时,要根据具体需求合理选择同步机制,尽量使用更轻量级的方式实现同步,只有在必要时才使用 WaitGroup

总结

WaitGroup 是 Go 语言并发编程中一个非常实用的工具,它提供了一种简单而有效的方式来等待一组 goroutine 完成任务。通过合理运用 WaitGroup,我们可以实现复杂的并发场景,如分组等待、任务流水线等。同时,结合 Mutexchannel 等其他同步机制,WaitGroup 可以进一步增强程序的安全性和灵活性。在使用 WaitGroup 时,要注意遵循其使用规范,避免在 goroutine 内部调用 Add 方法,同时要考虑性能因素,减少不必要的等待和同步开销。希望通过本文的介绍和示例,你对 WaitGroup 的应用有了更深入的理解,能够在实际的 Go 语言项目中灵活运用它来实现高效的并发编程。