MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

go 中管理 goroutine 生命周期的最佳实践

2023-03-213.8k 阅读

使用 context 控制 goroutine 生命周期

在 Go 语言中,context 包提供了一种优雅且高效的方式来管理 goroutine 的生命周期。context 主要用于在 goroutine 树中传递截止日期、取消信号和其他请求范围的值。

1. context 基本原理

context 是一个接口类型,定义如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
  • Deadline 方法返回 context 的截止时间。如果截止时间未设置,ok 返回 false
  • Done 方法返回一个只读通道,当 context 被取消或超时,该通道会被关闭。
  • Err 方法返回 context 被取消的原因。如果 context 未被取消,返回 nil
  • Value 方法用于从 context 中获取特定键的值。

2. 取消单个 goroutine

假设我们有一个简单的 goroutine,它模拟一个长时间运行的任务:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker received cancel signal, exiting...")
            return
        default:
            fmt.Println("Worker is working...")
            time.Sleep(1 * time.Second)
        }
    }
}

在主函数中,我们可以使用 context.WithCancel 来创建一个可取消的 context,并在需要时取消它:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go worker(ctx)

    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(1 * time.Second)
}

在上述代码中,context.WithCancel 基于 context.Background() 创建了一个可取消的 context。worker goroutine 会在每次循环中检查 ctx.Done() 通道是否关闭。如果关闭,说明收到取消信号,于是退出循环。主函数中,3 秒后调用 cancel() 函数,向所有基于该 context 创建的子 context 发送取消信号。

3. 超时控制

有时候我们希望 goroutine 在一定时间内完成任务,否则自动取消。可以使用 context.WithTimeout 来实现:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    go func(ctx context.Context) {
        select {
        case <-ctx.Done():
            fmt.Println("Operation timed out")
        case <-time.After(3 * time.Second):
            fmt.Println("Operation completed within time limit")
        }
    }(ctx)

    time.Sleep(3 * time.Second)
}

这里 context.WithTimeout 创建了一个 2 秒超时的 context。在 goroutine 中,select 语句等待 ctx.Done() 通道(超时或手动取消时会关闭)或 3 秒定时器通道。由于设置的超时时间为 2 秒,所以大概率会打印 “Operation timed out”。

4. 传递 context

context 可以在 goroutine 树中传递,确保所有相关的 goroutine 能同步收到取消或超时信号。

func subWorker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Sub - worker received cancel signal, exiting...")
            return
        default:
            fmt.Println("Sub - worker is working...")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go func(ctx context.Context) {
        go subWorker(ctx)
        time.Sleep(3 * time.Second)
        cancel()
    }(ctx)

    time.Sleep(4 * time.Second)
}

main 函数中,创建了一个可取消的 context,并启动一个 goroutine。该 goroutine 又启动了一个 subWorker goroutine。当主 goroutine 调用 cancel() 时,subWorker 也会收到取消信号并退出。

使用 WaitGroup 等待 goroutine 完成

WaitGroup 是 Go 标准库中用于等待一组 goroutine 完成的工具。它可以让主 goroutine 阻塞,直到所有相关的 goroutine 都执行完毕。

1. WaitGroup 基本原理

WaitGroup 内部维护一个计数器,通过 Add 方法增加计数,Done 方法减少计数,Wait 方法阻塞直到计数器为 0。

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

state1 字段存储计数器的值和等待队列的信息。

2. 简单示例

假设我们有多个 goroutine 同时执行任务,并且希望主 goroutine 在所有任务完成后再继续执行:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Worker started")
    time.Sleep(2 * time.Second)
    fmt.Println("Worker finished")
}

在主函数中:

func main() {
    var wg sync.WaitGroup
    wg.Add(3)

    go worker(&wg)
    go worker(&wg)
    go worker(&wg)

    wg.Wait()
    fmt.Println("All workers have finished")
}

在上述代码中,wg.Add(3) 将计数器设置为 3,每个 worker goroutine 在开始时调用 defer wg.Done(),表示任务完成,会减少计数器的值。wg.Wait() 会阻塞主 goroutine,直到计数器变为 0,即所有 worker goroutine 都调用了 Done

3. 动态添加 goroutine

WaitGroup 也支持动态添加 goroutine。

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            fmt.Printf("Worker %d started\n", index)
            time.Sleep(time.Duration(index) * time.Second)
            fmt.Printf("Worker %d finished\n", index)
        }(i)
    }
    wg.Wait()
    fmt.Println("All workers have finished")
}

在这个循环中,每次启动一个新的 goroutine 时,调用 wg.Add(1) 增加计数器。每个 goroutine 完成后调用 wg.Done() 减少计数器。主 goroutine 通过 wg.Wait() 等待所有 goroutine 完成。

使用 channel 同步 goroutine

channel 是 Go 语言中用于 goroutine 之间通信和同步的重要机制。通过 channel 可以实现数据的传递,同时也可以用于控制 goroutine 的生命周期。

1. 基本的 channel 同步

假设我们有一个 goroutine 负责生成数据,另一个负责消费数据:

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch chan int) {
    for num := range ch {
        fmt.Println("Consumed:", num)
    }
}

在主函数中:

func main() {
    ch := make(chan int)
    go producer(ch)
    go consumer(ch)

    select {}
}

在上述代码中,producer goroutine 通过 ch <- i 向 channel 发送数据,完成后调用 close(ch) 关闭 channel。consumer goroutine 使用 for... range 循环从 channel 读取数据,当 channel 关闭时,循环自动结束。主函数中的 select {} 用于阻塞主 goroutine,防止程序提前退出。

2. 使用 channel 控制 goroutine 结束

我们可以利用 channel 来通知 goroutine 结束。

func worker(stop chan struct{}) {
    for {
        select {
        case <-stop:
            fmt.Println("Worker received stop signal, exiting...")
            return
        default:
            fmt.Println("Worker is working...")
        }
    }
}

在主函数中:

func main() {
    stop := make(chan struct{})
    go worker(stop)

    // 模拟一些工作
    // 然后发送停止信号
    time.Sleep(3 * time.Second)
    close(stop)
    time.Sleep(1 * time.Second)
}

这里创建了一个 stop channel,worker goroutine 在每次循环中检查 stop channel 是否有数据或关闭信号。主函数在 3 秒后关闭 stop channel,worker goroutine 收到信号后退出。

3. 多 goroutine 同步

假设有多个 goroutine 协同工作,我们可以使用 channel 来同步它们的操作。

func worker1(ch1 chan int, ch2 chan struct{}) {
    ch1 <- 10
    <-ch2
    fmt.Println("Worker1 resumed after receiving signal from worker2")
}

func worker2(ch1 chan int, ch2 chan struct{}) {
    num := <-ch1
    fmt.Println("Worker2 received:", num)
    ch2 <- struct{}{}
}

在主函数中:

func main() {
    ch1 := make(chan int)
    ch2 := make(chan struct{})

    go worker1(ch1, ch2)
    go worker2(ch1, ch2)

    select {}
}

worker1 首先向 ch1 发送数据,然后等待 ch2 信号。worker2ch1 读取数据,处理后向 ch2 发送信号。通过这种方式,两个 goroutine 实现了同步。

错误处理与 goroutine 生命周期管理

在实际应用中,goroutine 执行过程中可能会出现错误。正确处理这些错误并合理管理 goroutine 的生命周期至关重要。

1. 通过 channel 传递错误

我们可以定义一个专门的 channel 用于传递错误信息。

package main

import (
    "fmt"
)

func worker(errCh chan error) {
    // 模拟一个可能出错的操作
    if true {
        errCh <- fmt.Errorf("An error occurred in worker")
        return
    }
    errCh <- nil
}

在主函数中:

func main() {
    errCh := make(chan error)
    go worker(errCh)

    err := <-errCh
    if err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("Worker completed successfully")
    }
}

在上述代码中,worker goroutine 通过 errCh 发送错误信息。主函数从 errCh 读取错误,如果不为 nil,则处理错误。

2. 使用 context 处理错误

结合 context,我们可以在 goroutine 因错误取消时进行更全面的处理。

func worker(ctx context.Context, errCh chan error) {
    select {
    case <-ctx.Done():
        errCh <- ctx.Err()
        return
    default:
        // 模拟一个可能出错的操作
        if true {
            errCh <- fmt.Errorf("An error occurred in worker")
            return
        }
        errCh <- nil
    }
}

在主函数中:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    errCh := make(chan error)
    go worker(ctx, errCh)

    err := <-errCh
    if err != nil {
        fmt.Println("Error:", err)
        cancel()
    } else {
        fmt.Println("Worker completed successfully")
    }
}

这里 worker goroutine 不仅通过 errCh 发送错误,还会在 ctx.Done() 通道关闭时发送 ctx.Err()。主函数在收到错误时,调用 cancel() 取消 context,确保所有相关的 goroutine 都能收到取消信号并安全退出。

3. 错误处理与 WaitGroup 结合

当有多个 goroutine 时,我们可以结合 WaitGroup 和错误处理。

func worker(wg *sync.WaitGroup, errCh chan error) {
    defer wg.Done()
    // 模拟一个可能出错的操作
    if true {
        errCh <- fmt.Errorf("An error occurred in worker")
        return
    }
    errCh <- nil
}

在主函数中:

func main() {
    var wg sync.WaitGroup
    errCh := make(chan error)

    numWorkers := 3
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(&wg, errCh)
    }

    go func() {
        wg.Wait()
        close(errCh)
    }()

    for err := range errCh {
        if err != nil {
            fmt.Println("Error:", err)
        }
    }
}

在这个例子中,每个 worker goroutine 完成后调用 wg.Done(),主函数通过 wg.Wait() 等待所有 goroutine 完成。同时,worker goroutine 通过 errCh 发送错误信息,主函数从 errCh 中读取并处理错误。

资源管理与 goroutine 生命周期

在 goroutine 执行过程中,可能会涉及到资源的分配和释放,如文件句柄、数据库连接等。合理管理这些资源与 goroutine 的生命周期紧密相关。

1. 使用 defer 释放资源

当一个 goroutine 打开了资源,如文件,我们可以使用 defer 语句在 goroutine 结束时释放资源。

package main

import (
    "fmt"
    "os"
)

func fileWorker() {
    file, err := os.Open("test.txt")
    if err != nil {
        fmt.Println("Error opening file:", err)
        return
    }
    defer file.Close()

    // 处理文件内容
    fmt.Println("File opened successfully, processing...")
}

在上述代码中,defer file.Close() 确保无论 fileWorker goroutine 以何种方式结束(正常结束或因错误提前结束),文件都会被关闭。

2. 资源池与 goroutine

在高并发场景下,资源池可以有效管理资源的分配和回收。以数据库连接池为例,假设我们有一个简单的连接池实现:

package main

import (
    "database/sql"
    "fmt"
    "sync"

    _ "github.com/go - sql - driver/mysql"
)

type ConnectionPool struct {
    pool     chan *sql.DB
    maxConns int
}

func NewConnectionPool(dsn string, maxConns int) (*ConnectionPool, error) {
    pool := make(chan *sql.DB, maxConns)
    for i := 0; i < maxConns; i++ {
        db, err := sql.Open("mysql", dsn)
        if err != nil {
            close(pool)
            return nil, err
        }
        pool <- db
    }
    return &ConnectionPool{
        pool:     pool,
        maxConns: maxConns,
    }, nil
}

func (cp *ConnectionPool) GetConnection() *sql.DB {
    return <-cp.pool
}

func (cp *ConnectionPool) ReturnConnection(db *sql.DB) {
    cp.pool <- db
}

在 goroutine 中使用连接池:

func dbWorker(cp *ConnectionPool) {
    db := cp.GetConnection()
    defer cp.ReturnConnection(db)

    // 使用数据库连接执行操作
    rows, err := db.Query("SELECT * FROM users")
    if err != nil {
        fmt.Println("Error querying database:", err)
        return
    }
    defer rows.Close()

    // 处理查询结果
    for rows.Next() {
        // 处理每一行数据
    }
}

在这个例子中,ConnectionPool 管理数据库连接的创建、获取和回收。dbWorker goroutine 从连接池获取连接,使用完毕后通过 defer 语句归还连接,确保连接资源的正确管理。

3. 资源清理与 context

结合 context,我们可以在 goroutine 取消时进行资源清理。

func resourceWorker(ctx context.Context, cp *ConnectionPool) {
    db := cp.GetConnection()
    defer cp.ReturnConnection(db)

    select {
    case <-ctx.Done():
        // 进行额外的资源清理,如取消未完成的事务
        fmt.Println("Resource worker received cancel signal, cleaning up...")
        return
    default:
        // 使用数据库连接执行操作
        rows, err := db.Query("SELECT * FROM users")
        if err != nil {
            fmt.Println("Error querying database:", err)
            return
        }
        defer rows.Close()

        // 处理查询结果
        for rows.Next() {
            // 处理每一行数据
        }
    }
}

在上述代码中,resourceWorker goroutine 会在收到 ctx.Done() 信号时,进行额外的资源清理工作,如取消未完成的数据库事务,然后安全退出。

总结最佳实践要点

  1. 使用 context 进行取消和超时控制:在大多数情况下,context 是管理 goroutine 生命周期的首选方式。它能够在 goroutine 树中高效传递取消和超时信号,确保所有相关的 goroutine 能及时响应。在设计并发程序时,应从顶层 goroutine 向下传递 context,使得每个子 goroutine 都能感知到取消或超时信号。
  2. 结合 WaitGroup 等待 goroutine 完成:当需要等待一组 goroutine 全部完成后再继续执行时,WaitGroup 是非常实用的工具。在启动 goroutine 前,合理设置 WaitGroup 的计数器,并在每个 goroutine 结束时调用 Done 方法。注意避免在计数器为 0 后再次调用 Add 方法,以免导致 Wait 方法死锁。
  3. 利用 channel 进行同步和通信:channel 不仅可以用于 goroutine 之间的数据传递,还能实现同步和控制 goroutine 的生命周期。通过向 channel 发送信号或关闭 channel,可以通知 goroutine 停止或进行特定的操作。在使用 channel 时,要注意正确处理 channel 的关闭,避免出现发送到已关闭 channel 或从已关闭 channel 读取数据的错误。
  4. 妥善处理错误:在 goroutine 中,应通过合适的方式传递和处理错误。可以使用专门的 error channel 来传递错误信息,结合 context 来处理因错误导致的取消操作。同时,在处理多个 goroutine 的错误时,要确保每个 goroutine 的错误都能被正确捕获和处理。
  5. 合理管理资源:对于在 goroutine 中使用的资源,如文件句柄、数据库连接等,要确保在 goroutine 结束时正确释放。使用 defer 语句可以方便地实现资源的自动释放。在高并发场景下,考虑使用资源池来提高资源的利用率和管理效率,并结合 context 在 goroutine 取消时进行全面的资源清理。

通过遵循这些最佳实践,可以编写出健壮、高效且易于维护的并发 Go 程序,有效管理 goroutine 的生命周期,避免常见的并发问题。