MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

使用WaitGroup实现并发任务的同步等待

2022-04-286.8k 阅读

Go语言并发编程基础

在深入探讨WaitGroup之前,我们先来回顾一下Go语言并发编程的基础概念。Go语言在设计之初就将并发编程作为其核心特性之一,通过goroutinechannel提供了简洁而强大的并发编程模型。

goroutine

goroutine是Go语言中实现并发的轻量级线程。与传统线程相比,goroutine的创建和销毁开销极小,使得在程序中可以轻松创建成千上万的并发任务。在Go语言中,使用go关键字来启动一个goroutine。例如:

package main

import (
    "fmt"
)

func sayHello() {
    fmt.Println("Hello, Goroutine!")
}

func main() {
    go sayHello()
    fmt.Println("Main function")
}

在上述代码中,go sayHello()启动了一个新的goroutine来执行sayHello函数。然而,运行这段代码你可能会发现,控制台只输出了“Main function”,而没有输出“Hello, Goroutine!”。这是因为main函数是程序的主goroutine,当main函数执行完毕后,程序就会结束,即使其他goroutine还在运行中。这就引出了我们需要解决的问题:如何让主goroutine等待其他goroutine完成任务后再结束。

channel

channel是Go语言中用于在goroutine之间进行通信和同步的机制。它可以被看作是一个管道,数据可以从一端发送,从另一端接收。channel的使用可以避免共享内存带来的并发问题,实现数据的安全传递。例如:

package main

import (
    "fmt"
)

func sendData(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func main() {
    ch := make(chan int)
    go sendData(ch)
    for data := range ch {
        fmt.Println("Received:", data)
    }
}

在这段代码中,sendData函数通过channel发送数据,main函数通过for... range循环从channel中接收数据,直到channel被关闭。channel虽然可以实现goroutine之间的同步,但在某些场景下,比如需要等待一组goroutine全部完成任务时,使用channel就显得不够简洁和直观。这时,WaitGroup就派上用场了。

WaitGroup的基本概念

WaitGroup是Go语言标准库sync包中的一个类型,它用于实现多个goroutine之间的同步等待。WaitGroup内部维护一个计数器,通过对计数器的操作来控制goroutine的等待和完成。

WaitGroup的主要方法

  1. Add(delta int):该方法用于增加WaitGroup内部计数器的值。delta参数表示增加的数量,可以是正数或负数,但通常为正数。例如,wg.Add(1)表示增加1,wg.Add(3)表示增加3。
  2. Done():该方法用于减少WaitGroup内部计数器的值,相当于wg.Add(-1)。通常在一个goroutine完成任务后调用此方法。
  3. Wait():该方法会阻塞当前goroutine,直到WaitGroup内部计数器的值变为0。也就是说,只有当所有调用Done()方法将计数器减为0时,Wait()方法才会返回,继续执行后续代码。

使用WaitGroup实现简单的并发任务同步

下面我们通过一个简单的示例来演示如何使用WaitGroup实现并发任务的同步等待。假设我们有多个任务需要并发执行,每个任务打印出自己的编号,并且在所有任务完成后,主goroutine打印“All tasks are done”。

package main

import (
    "fmt"
    "sync"
)

func task(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Task %d is running\n", id)
}

func main() {
    var wg sync.WaitGroup
    numTasks := 5
    for i := 1; i <= numTasks; i++ {
        wg.Add(1)
        go task(i, &wg)
    }
    wg.Wait()
    fmt.Println("All tasks are done")
}

在上述代码中:

  1. 首先在main函数中声明了一个WaitGroup变量wg
  2. 使用for循环启动5个goroutine,每个goroutine执行task函数。在启动每个goroutine之前,调用wg.Add(1)增加计数器的值。
  3. task函数中,通过defer wg.Done()在函数结束时减少计数器的值。defer关键字确保无论task函数如何结束(正常返回或发生错误),wg.Done()都会被调用。
  4. 最后,在main函数中调用wg.Wait(),主goroutine会阻塞在这里,直到所有goroutine完成任务(即计数器的值变为0),然后打印“All tasks are done”。

WaitGroup在复杂并发场景中的应用

在实际开发中,并发场景往往更加复杂。例如,我们可能有多个不同类型的任务,需要分组等待,或者在任务执行过程中动态地添加新的任务。下面我们通过几个示例来展示WaitGroup在这些复杂场景中的应用。

分组等待任务

假设我们有两组任务,第一组任务打印“A”,第二组任务打印“B”。我们希望在两组任务分别完成后,打印相应的完成信息。

package main

import (
    "fmt"
    "sync"
)

func groupATask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Group A Task %d is running\n", id)
}

func groupBTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Group B Task %d is running\n", id)
}

func main() {
    var wgA, wgB sync.WaitGroup
    numGroupATasks := 3
    numGroupBTasks := 2
    for i := 1; i <= numGroupATasks; i++ {
        wgA.Add(1)
        go groupATask(i, &wgA)
    }
    for i := 1; i <= numGroupBTasks; i++ {
        wgB.Add(1)
        go groupBTask(i, &wgB)
    }
    wgA.Wait()
    fmt.Println("Group A tasks are done")
    wgB.Wait()
    fmt.Println("Group B tasks are done")
}

在这个示例中:

  1. 声明了两个WaitGroup变量wgAwgB,分别用于两组任务的同步。
  2. 通过两个for循环分别启动两组goroutine,并对相应的WaitGroup增加计数器。
  3. 分别调用wgA.Wait()wgB.Wait()等待两组任务完成,并在每组任务完成后打印相应的完成信息。

动态添加任务

在某些情况下,任务的数量可能不是固定的,而是在程序运行过程中动态生成的。例如,我们有一个任务生成器,它会根据一定的条件不断生成新的任务,并且需要等待所有生成的任务完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

func dynamicTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Dynamic Task %d is running\n", id)
    time.Sleep(time.Second)
}

func taskGenerator(wg *sync.WaitGroup) {
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go dynamicTask(i, wg)
        time.Sleep(time.Millisecond * 500)
    }
}

func main() {
    var wg sync.WaitGroup
    go taskGenerator(&wg)
    time.Sleep(time.Second * 2)
    for i := 6; i <= 8; i++ {
        wg.Add(1)
        go dynamicTask(i, &wg)
    }
    wg.Wait()
    fmt.Println("All dynamic tasks are done")
}

在上述代码中:

  1. taskGenerator函数会在启动后每隔500毫秒生成一个新的任务,并添加到WaitGroup中。
  2. main函数中,先启动taskGenerator,然后等待2秒,模拟程序运行过程中动态添加新任务的场景。
  3. 再次通过for循环添加3个新任务,并调用wg.Wait()等待所有任务完成。

WaitGroup的注意事项

在使用WaitGroup时,有一些注意事项需要我们关注,以避免出现并发错误。

避免计数器操作错误

  1. Add操作时机Add方法应该在启动goroutine之前调用,确保计数器在goroutine开始执行任务前已经正确设置。如果在goroutine执行过程中调用Add,可能会导致goroutine已经执行Done方法后计数器还未正确增加,从而出现同步错误。
  2. Done操作次数:每个启动的goroutine都应该对应一次Done操作,且不能多调用。如果某个goroutine没有调用DoneWaitGroup的计数器将永远不会变为0,Wait方法会一直阻塞。反之,如果某个goroutine多次调用Done,可能会导致计数器变为负数,引发运行时错误。

避免死锁

死锁是并发编程中常见的问题,在使用WaitGroup时也需要注意避免。例如,以下情况可能会导致死锁:

package main

import (
    "fmt"
    "sync"
)

func deadlockTask(wg *sync.WaitGroup) {
    wg.Wait()
    fmt.Println("Deadlock Task")
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go deadlockTask(&wg)
    wg.Wait()
    fmt.Println("Main function")
}

在上述代码中,deadlockTask函数中先调用了wg.Wait(),而此时wg的计数器为1,没有其他goroutine调用Done方法将计数器减为0,所以deadlockTask会一直阻塞。同时,main函数中也调用了wg.Wait(),这就导致了两个goroutine相互等待,产生死锁。为了避免这种情况,我们需要确保goroutine的逻辑正确,Wait操作应该在所有需要等待的任务都启动并且计数器设置正确之后进行。

错误处理

在实际应用中,goroutine可能会因为各种原因发生错误。当使用WaitGroup时,我们需要考虑如何处理这些错误。一种常见的做法是通过channel传递错误信息。例如:

package main

import (
    "fmt"
    "sync"
)

func errorTask(id int, wg *sync.WaitGroup, errCh chan error) {
    defer wg.Done()
    if id == 3 {
        errCh <- fmt.Errorf("Task %d has an error", id)
        return
    }
    fmt.Printf("Task %d is running\n", id)
}

func main() {
    var wg sync.WaitGroup
    numTasks := 5
    errCh := make(chan error, numTasks)
    for i := 1; i <= numTasks; i++ {
        wg.Add(1)
        go errorTask(i, &wg, errCh)
    }
    go func() {
        wg.Wait()
        close(errCh)
    }()
    for err := range errCh {
        if err != nil {
            fmt.Println("Error:", err)
        }
    }
    fmt.Println("All tasks are processed")
}

在这个示例中:

  1. errorTask函数在任务执行过程中如果遇到错误,会通过errCh这个channel发送错误信息。
  2. main函数中,启动所有goroutine后,使用一个单独的goroutine等待WaitGroup完成,然后关闭errCh
  3. 通过for... range循环从errCh中接收错误信息并进行处理,确保在所有任务完成后,能够及时发现并处理可能出现的错误。

WaitGroup与其他同步机制的比较

在Go语言的并发编程中,除了WaitGroup,还有其他一些同步机制,如MutexRWMutexCond等。下面我们来比较一下WaitGroup与这些同步机制的特点和适用场景。

WaitGroup与Mutex

  1. 功能特点
    • Mutex(互斥锁)主要用于保护共享资源,确保在同一时间只有一个goroutine能够访问共享资源,从而避免数据竞争。它通过锁定和解锁操作来实现对共享资源的独占访问。
    • WaitGroup则专注于实现goroutine之间的同步等待,它不涉及对共享资源的直接访问控制,而是通过计数器来协调多个goroutine的执行顺序。
  2. 适用场景
    • 当需要保护共享数据,防止多个goroutine同时修改导致数据不一致时,应使用Mutex。例如,对一个全局变量的读写操作。
    • 当需要等待一组goroutine全部完成任务后再继续执行后续逻辑时,WaitGroup是更合适的选择。比如多个goroutine并发处理数据,处理完成后进行汇总统计。

WaitGroup与RWMutex

  1. 功能特点
    • RWMutex(读写互斥锁)是Mutex的一种变体,它区分了读操作和写操作。允许多个goroutine同时进行读操作,但在写操作时,会独占资源,不允许其他goroutine进行读或写操作。这使得在多读少写的场景下,能够提高程序的并发性能。
    • WaitGroup如前所述,主要用于goroutine的同步等待,与共享资源的读写控制无关。
  2. 适用场景
    • 在数据读取频繁而写入较少的场景中,RWMutex可以有效提高并发性能,例如读取配置文件等操作。
    • 而对于需要等待多个goroutine完成任务的场景,WaitGroup仍然是专门为此设计的工具。

WaitGroup与Cond

  1. 功能特点
    • Cond(条件变量)通常与Mutex配合使用,用于在某些条件满足时通知等待的goroutine。它可以让goroutine在等待某个条件时进入睡眠状态,当条件满足时,通过SignalBroadcast方法唤醒等待的goroutine
    • WaitGroup是基于计数器的同步机制,只要计数器变为0,等待的goroutine就会继续执行,不依赖于特定的条件判断。
  2. 适用场景
    • goroutine需要等待某个特定条件(如某个数据达到特定值、某个事件发生等)才能继续执行时,Cond结合Mutex是较好的选择。
    • 对于简单的并发任务同步等待,WaitGroup的使用更加简洁直接。

总结

WaitGroup是Go语言并发编程中一个非常实用的工具,它通过简单的计数器操作,实现了多个goroutine之间的同步等待。在实际应用中,我们可以根据具体的并发场景,灵活运用WaitGroup来解决复杂的同步问题。同时,我们也需要注意WaitGroup的使用细节,避免计数器操作错误、死锁等常见问题。与其他同步机制相比,WaitGroup有着独特的功能和适用场景,在Go语言的并发编程体系中占据着重要的地位。通过深入理解和熟练运用WaitGroup,我们能够编写出更加健壮、高效的并发程序。希望本文的内容能够帮助你更好地掌握WaitGroup在Go语言并发编程中的应用。