MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go WaitGroup在并发中的作用

2021-08-256.7k 阅读

Go 并发编程简介

在Go语言中,并发编程是其核心优势之一。Go通过轻量级的协程(goroutine)来实现高效的并发处理。一个goroutine可以看作是一个独立的执行单元,它非常轻量级,创建和销毁的开销极小。与传统线程相比,goroutine的调度由Go运行时(runtime)负责,采用M:N调度模型,即多个goroutine映射到多个操作系统线程上,这种调度模型使得Go在处理大量并发任务时能够表现出色。

例如,我们来看一个简单的示例,通过go关键字来启动一个goroutine:

package main

import (
    "fmt"
    "time"
)

func printMessage() {
    fmt.Println("Hello from goroutine")
}

func main() {
    go printMessage()
    time.Sleep(1 * time.Second)
    fmt.Println("Main function")
}

在上述代码中,go printMessage()语句启动了一个新的goroutine来执行printMessage函数。主函数main会继续执行后续代码,不会等待printMessage函数执行完毕。这里使用time.Sleep是为了确保在主函数退出之前,goroutine有机会执行并打印消息。

然而,这种简单的并发模式在实际应用中往往不够完善。当有多个goroutine同时执行时,我们经常需要一种机制来协调它们的执行,确保所有的goroutine都完成任务后再进行下一步操作。这就是WaitGroup发挥作用的地方。

WaitGroup概述

WaitGroup是Go标准库sync包中的一个类型,用于等待一组goroutine完成。它提供了一种同步机制,允许一个goroutine等待其他多个goroutine执行完毕。WaitGroup内部维护一个计数器,通过调用Add方法来增加计数器的值,调用Done方法来减少计数器的值,调用Wait方法来阻塞当前goroutine,直到计数器的值变为0。

WaitGroup的方法详解

Add方法

Add方法用于增加WaitGroup的计数器。它接受一个整数参数delta,将计数器增加delta的值。例如,如果delta为1,则计数器加1,表示有一个新的goroutine开始执行任务。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine is running")
    }()
    wg.Wait()
    fmt.Println("All goroutines have finished")
}

在上述代码中,wg.Add(1)WaitGroup的计数器设置为1。当启动的goroutine执行完毕并调用wg.Done()后,计数器减为0,wg.Wait()解除阻塞,主函数继续执行并打印“All goroutines have finished”。

需要注意的是,如果在WaitGroup已经开始等待(即调用了Wait方法)后再调用Add方法,可能会导致程序死锁。因此,通常建议在启动goroutine之前调用Add方法。

Done方法

Done方法实际上是Add(-1)的快捷方式,用于将WaitGroup的计数器减1。通常在goroutine的末尾调用Done方法,以表示该goroutine的任务已经完成。在实际编程中,我们经常使用defer关键字来确保Done方法一定会被调用,即使goroutine中发生了恐慌(panic)。

package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup, id int) {
    defer wg.Done()
    fmt.Printf("Worker %d is working\n", id)
    // 模拟一些工作
    for i := 0; i < 1000000; i++ {
        // 空循环,消耗一些时间
    }
    fmt.Printf("Worker %d has finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 5
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(&wg, i)
    }
    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个示例中,worker函数在开始时使用defer wg.Done(),无论函数正常结束还是发生异常,都会将WaitGroup的计数器减1。主函数通过wg.Wait()等待所有的worker goroutine完成任务。

Wait方法

Wait方法会阻塞调用它的goroutine,直到WaitGroup的计数器变为0。这意味着所有调用了Add方法并随后调用Done方法的goroutine都已经完成了它们的任务。

WaitGroup在实际场景中的应用

多任务并行处理

假设我们有一个任务,需要从多个远程服务器获取数据,然后对这些数据进行汇总分析。我们可以使用WaitGroup来管理这些并发的任务。

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
)

func fetchData(url string, wg *sync.WaitGroup, resultChan chan string) {
    defer wg.Done()
    resp, err := http.Get(url)
    if err != nil {
        resultChan <- fmt.Sprintf("Error fetching %s: %v", url, err)
        return
    }
    defer resp.Body.Close()
    data, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        resultChan <- fmt.Sprintf("Error reading data from %s: %v", url, err)
        return
    }
    resultChan <- fmt.Sprintf("Data from %s: %s", url, data)
}

func main() {
    urls := []string{
        "http://example.com",
        "http://google.com",
        "http://github.com",
    }
    var wg sync.WaitGroup
    resultChan := make(chan string, len(urls))
    for _, url := range urls {
        wg.Add(1)
        go fetchData(url, &wg, resultChan)
    }
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    for result := range resultChan {
        fmt.Println(result)
    }
    fmt.Println("All data fetched and processed")
}

在上述代码中,我们为每个URL启动一个goroutine来获取数据。WaitGroup用于确保所有的fetchData goroutine都完成任务后再关闭resultChan。主函数通过从resultChan中读取数据来处理获取到的结果。

任务依赖关系处理

有时候,我们的任务之间存在依赖关系。例如,任务A需要等待任务B和任务C都完成后才能开始执行。我们可以使用WaitGroup来处理这种情况。

package main

import (
    "fmt"
    "sync"
)

func taskB(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Task B is running")
    // 模拟一些工作
    for i := 0; i < 1000000; i++ {
        // 空循环,消耗一些时间
    }
    fmt.Println("Task B has finished")
}

func taskC(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Task C is running")
    // 模拟一些工作
    for i := 0; i < 1000000; i++ {
        // 空循环,消耗一些时间
    }
    fmt.Println("Task C has finished")
}

func taskA(wg1, wg2 *sync.WaitGroup) {
    wg1.Wait()
    wg2.Wait()
    fmt.Println("Task A is running, since B and C are done")
    // 模拟一些工作
    for i := 0; i < 1000000; i++ {
        // 空循环,消耗一些时间
    }
    fmt.Println("Task A has finished")
}

func main() {
    var wg1, wg2 sync.WaitGroup
    wg1.Add(1)
    wg2.Add(1)
    go taskB(&wg1)
    go taskC(&wg2)
    go taskA(&wg1, &wg2)
    // 防止主函数过早退出
    select {}
}

在这个例子中,taskA需要等待taskBtaskC都完成。通过WaitGroup,我们可以很方便地实现这种任务之间的依赖关系。

WaitGroup的注意事项

避免死锁

如前文所述,在WaitGroup已经开始等待(调用了Wait方法)后再调用Add方法可能会导致死锁。此外,如果在Done方法调用次数少于Add方法调用次数,Wait方法也会永远阻塞,从而导致死锁。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        // 忘记调用wg.Done()
        fmt.Println("Goroutine is running")
    }()
    wg.Wait()
    fmt.Println("This will never be printed")
}

在上述代码中,由于goroutine中没有调用wg.Done()wg.Wait()会永远阻塞,导致程序死锁。

正确使用计数器

在使用Add方法时,要确保增加的计数器数量与实际启动的goroutine数量一致。如果计数器设置过多,Wait方法可能会等待不必要的时间;如果计数器设置过少,可能会导致部分goroutine的任务还未完成时就继续执行后续代码。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    // 只Add了1,但是启动了3个goroutine
    wg.Add(1)
    for i := 0; i < 3; i++ {
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d is running\n", id)
        }(i)
    }
    wg.Wait()
    fmt.Println("All goroutines may not have finished")
}

在这个例子中,由于wg.Add(1)只增加了1,而启动了3个goroutine,可能会导致部分goroutine还未完成任务时,wg.Wait()就解除阻塞,从而输出“All goroutines may not have finished”。

并发安全

WaitGroup是并发安全的,可以在多个goroutine中同时使用。这意味着不同的goroutine可以安全地调用AddDoneWait方法,而无需额外的同步机制。

package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup, id int) {
    defer wg.Done()
    fmt.Printf("Worker %d is working\n", id)
    // 模拟一些工作
    for i := 0; i < 1000000; i++ {
        // 空循环,消耗一些时间
    }
    fmt.Printf("Worker %d has finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 5
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(&wg, i)
    }
    // 多个goroutine可能同时调用wg.Done(),但这是安全的
    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个示例中,多个worker goroutine同时调用wg.Done(),由于WaitGroup是并发安全的,不会出现数据竞争等问题。

WaitGroup与其他同步机制的比较

与Mutex的比较

Mutex(互斥锁)主要用于保护共享资源,防止多个goroutine同时访问,从而避免数据竞争。而WaitGroup主要用于等待一组goroutine完成任务,并不直接涉及共享资源的保护。

例如,假设我们有一个共享变量count,多个goroutine需要对其进行递增操作。为了防止数据竞争,我们使用Mutex

package main

import (
    "fmt"
    "sync"
)

var (
    count int
    mu    sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    count++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    numGoroutines := 1000
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Printf("Final count: %d\n", count)
}

在这个例子中,Mutex用于保护count变量,确保每次只有一个goroutine可以对其进行操作。WaitGroup则用于等待所有的increment goroutine完成任务,以便输出最终的count值。

与Channel的比较

Channel(通道)用于在goroutine之间进行通信和同步。它可以传递数据,并且可以通过阻塞来实现同步。WaitGroup则侧重于等待一组goroutine完成,不涉及数据传递。

例如,我们可以使用Channel来实现类似WaitGroup的功能:

package main

import (
    "fmt"
)

func worker(done chan struct{}) {
    fmt.Println("Goroutine is running")
    // 模拟一些工作
    for i := 0; i < 1000000; i++ {
        // 空循环,消耗一些时间
    }
    fmt.Println("Goroutine has finished")
    done <- struct{}{}
}

func main() {
    numGoroutines := 3
    done := make(chan struct{}, numGoroutines)
    for i := 0; i < numGoroutines; i++ {
        go worker(done)
    }
    for i := 0; i < numGoroutines; i++ {
        <-done
    }
    close(done)
    fmt.Println("All goroutines have finished")
}

在这个例子中,通过向done通道发送数据来表示goroutine完成任务,主函数通过从done通道接收数据来等待所有goroutine完成。虽然可以实现类似WaitGroup的功能,但使用Channel实现这种等待逻辑相对复杂,而WaitGroup提供了一种更简洁、专门用于等待一组goroutine完成的机制。

总结

WaitGroup是Go语言并发编程中一个非常重要的工具,它为我们提供了一种简单而有效的方式来等待一组goroutine完成任务。通过合理使用AddDoneWait方法,我们可以解决各种并发场景下的同步问题,无论是多任务并行处理还是处理任务之间的依赖关系。在使用WaitGroup时,要注意避免死锁,正确设置计数器,并充分利用其并发安全的特性。与其他同步机制如MutexChannel相比,WaitGroup有着明确的适用场景,能够让我们的并发代码更加简洁和高效。掌握WaitGroup的使用方法是Go语言开发者进行高效并发编程的必备技能之一。在实际项目中,我们可以根据具体的需求和场景,灵活运用WaitGroup以及其他同步机制,构建出健壮、高效的并发程序。

通过深入理解WaitGroup在并发中的作用,我们可以更好地利用Go语言的并发特性,提升程序的性能和响应能力。无论是开发网络应用、分布式系统还是高性能计算程序,WaitGroup都将是我们的得力助手。在日常编程中,不断实践和总结使用WaitGroup的经验,能够让我们在面对复杂的并发问题时更加游刃有余。同时,结合Go语言的其他并发特性,如goroutine、channel等,我们可以构建出更加灵活和强大的并发应用程序。例如,在一个大型的分布式数据处理系统中,可能会有多个数据采集任务并行执行,然后将采集到的数据汇总到一个中心节点进行处理。这时,WaitGroup可以用于等待所有数据采集任务完成,确保数据的完整性,然后再进行后续的处理。又如,在一个高并发的Web服务器中,可能会有多个请求处理任务同时执行,WaitGroup可以帮助我们在所有请求处理完成后,进行一些清理工作或者统计任务的执行情况。总之,WaitGroup在Go语言的并发编程中扮演着不可或缺的角色,值得我们深入学习和掌握。