MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go使用context管理并发

2024-11-272.3k 阅读

Go语言并发编程基础

在深入探讨Go语言中 context 对并发的管理之前,先来回顾一下Go语言并发编程的基础概念。

Goroutine

Goroutine 是Go语言中实现并发的核心机制。它类似于线程,但更轻量级。创建一个Goroutine非常简单,只需在函数调用前加上 go 关键字即可。例如:

package main

import (
    "fmt"
    "time"
)

func worker() {
    for i := 0; i < 5; i++ {
        fmt.Println("Worker:", i)
        time.Sleep(time.Second)
    }
}

func main() {
    go worker()
    time.Sleep(6 * time.Second)
    fmt.Println("Main function exiting")
}

在上述代码中,go worker() 启动了一个新的Goroutine来执行 worker 函数。主函数在启动Goroutine后继续执行,而 worker 函数中的循环会异步运行。time.Sleep 用于模拟一些工作,防止程序过早退出。

通道(Channel)

通道是Goroutine之间进行通信和同步的重要工具。通道可以传递数据,通过通道可以实现不同Goroutine之间的数据共享和协作。例如,下面是一个简单的通道示例:

package main

import (
    "fmt"
)

func sender(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiver(ch chan int) {
    for num := range ch {
        fmt.Println("Received:", num)
    }
}

func main() {
    ch := make(chan int)
    go sender(ch)
    go receiver(ch)
    // 防止主函数过早退出
    select {}
}

在这个例子中,sender Goroutine 向通道 ch 发送数据,receiver Goroutine 从通道 ch 接收数据。range 关键字在通道上迭代,直到通道被关闭。close(ch) 用于关闭通道,通知接收方不再有数据发送。

并发编程中的问题

随着并发任务的增加,会出现一些问题,比如资源竞争、任务取消困难等。

资源竞争

资源竞争是指多个Goroutine同时访问和修改共享资源时可能出现的数据不一致问题。例如:

package main

import (
    "fmt"
    "sync"
)

var counter int
var wg sync.WaitGroup

func increment() {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        counter++
    }
}

func main() {
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go increment()
    }
    wg.Wait()
    fmt.Println("Final counter:", counter)
}

在上述代码中,counter 是一个共享资源,多个 increment Goroutine 同时对其进行累加操作。由于没有适当的同步机制,每次运行程序得到的 counter 最终值可能都不一样,这就是资源竞争的表现。解决资源竞争通常需要使用互斥锁(如 sync.Mutex)等同步工具。

任务取消困难

在并发编程中,有时需要在特定条件下取消正在运行的任务。例如,一个长时间运行的计算任务,当用户点击取消按钮时,希望能够终止该任务。但在没有合适机制的情况下,实现任务取消会很困难。传统的做法可能是设置一个共享的标志变量,各个Goroutine不断检查该标志来决定是否停止,但这种方法不够优雅且容易出错。例如:

package main

import (
    "fmt"
    "time"
)

var stop bool

func longRunningTask() {
    for!stop {
        fmt.Println("Task is running...")
        time.Sleep(time.Second)
    }
    fmt.Println("Task stopped")
}

func main() {
    go longRunningTask()
    time.Sleep(3 * time.Second)
    stop = true
    time.Sleep(time.Second)
    fmt.Println("Main function exiting")
}

在这个例子中,stop 是一个共享的标志变量,主函数通过设置 stoptrue 来尝试停止 longRunningTask。然而,这种方式依赖于 longRunningTask 主动且频繁地检查 stop 变量,而且如果有多个地方修改 stop 变量,还可能引发资源竞争问题。

Context 简介

context 包是Go 1.7 引入的,用于在多个Goroutine之间传递截止时间、取消信号和其他请求范围的值。它提供了一种优雅且安全的方式来管理并发任务的生命周期。

Context 接口

context 包定义了一个 Context 接口,该接口包含四个方法:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
  1. Deadline 方法:返回当前 Context 的截止时间。如果没有设置截止时间,ok 返回 false
  2. Done 方法:返回一个只读的通道 <-chan struct{}。当 Context 被取消或超时时,该通道会被关闭。
  3. Err 方法:返回 Context 被取消或超时时的错误原因。如果 Context 尚未取消或超时,返回 nil
  4. Value 方法:从 Context 中获取与给定 key 关联的值。通常用于在不同Goroutine之间传递请求范围内的数据。

背景 Context

context.Backgroundcontext.TODO 是两个用于创建根 Context 的函数。

  • context.Background:通常用于主函数、初始化和测试代码中,作为所有 Context 树的根。
  • context.TODO:用于暂时不知道该使用哪种 Context 的情况,例如在未来某个时候会传入具体的 Context

例如:

package main

import (
    "context"
    "fmt"
)

func main() {
    ctx := context.Background()
    fmt.Println(ctx)
    todoCtx := context.TODO()
    fmt.Println(todoCtx)
}

在上述代码中,ctxtodoCtx 分别是通过 context.Backgroundcontext.TODO 创建的根 Context。它们本身没有携带任何截止时间、取消信号或值,但可以作为其他 Context 的基础进行衍生。

使用 Context 管理并发

取消 Goroutine

context 最常见的用途之一就是取消Goroutine。通过 context.WithCancel 函数可以创建一个可取消的 Context。例如:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker stopped")
            return
        default:
            fmt.Println("Worker is working...")
            time.Sleep(time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go worker(ctx)
    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(time.Second)
    fmt.Println("Main function exiting")
}

在这个例子中,context.WithCancel 创建了一个可取消的 Context 和对应的取消函数 cancelworker Goroutine 通过 select 语句监听 ctx.Done() 通道。当主函数调用 cancel 函数时,ctx.Done() 通道被关闭,worker Goroutine 接收到信号后停止工作。

设置截止时间

通过 context.WithTimeoutcontext.WithDeadline 函数可以为 Context 设置截止时间。context.WithTimeout 设置的是从当前时间开始的超时时间,而 context.WithDeadline 设置的是一个具体的截止时间点。

使用 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func longTask(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Task cancelled due to timeout:", ctx.Err())
        return
    case <-time.After(5 * time.Second):
        fmt.Println("Task completed")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    go longTask(ctx)
    time.Sleep(4 * time.Second)
    fmt.Println("Main function exiting")
}

在上述代码中,context.WithTimeout 创建了一个 Context,设置超时时间为3秒。longTask 函数通过 select 语句监听 ctx.Done() 通道和一个5秒的定时器。由于超时时间设置为3秒,在3秒后 ctx.Done() 通道被关闭,longTask 函数接收到取消信号并输出相应信息。

使用 context.WithDeadline

package main

import (
    "context"
    "fmt"
    "time"
)

func longTask(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Task cancelled due to deadline:", ctx.Err())
        return
    case <-time.After(5 * time.Second):
        fmt.Println("Task completed")
    }
}

func main() {
    deadline := time.Now().Add(3 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    go longTask(ctx)
    time.Sleep(4 * time.Second)
    fmt.Println("Main function exiting")
}

这里通过 context.WithDeadline 创建了一个 Context,截止时间设置为当前时间3秒后。longTask 函数同样通过 select 语句监听 ctx.Done() 通道,在截止时间到达后,ctx.Done() 通道被关闭,longTask 函数收到取消信号。

在多个 Goroutine 间传递 Context

Context 可以在多个Goroutine之间传递,从而实现对整个任务链的统一管理。例如:

package main

import (
    "context"
    "fmt"
    "time"
)

func task1(ctx context.Context) {
    fmt.Println("Task1 started")
    select {
    case <-ctx.Done():
        fmt.Println("Task1 cancelled:", ctx.Err())
        return
    case <-time.After(2 * time.Second):
        fmt.Println("Task1 completed")
    }
    task2(ctx)
}

func task2(ctx context.Context) {
    fmt.Println("Task2 started")
    select {
    case <-ctx.Done():
        fmt.Println("Task2 cancelled:", ctx.Err())
        return
    case <-time.After(2 * time.Second):
        fmt.Println("Task2 completed")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    go task1(ctx)
    time.Sleep(4 * time.Second)
    fmt.Println("Main function exiting")
}

在这个例子中,task1 函数在执行完毕后调用 task2 函数,并将同一个 Context 传递下去。当主函数设置的超时时间到达后,ctx.Done() 通道被关闭,task1task2 函数都会收到取消信号并停止执行。

通过 Context 传递值

ContextValue 方法可以在不同Goroutine之间传递请求范围内的值。例如:

package main

import (
    "context"
    "fmt"
)

type requestKey string

func processRequest(ctx context.Context) {
    value := ctx.Value(requestKey("userID")).(string)
    fmt.Printf("Processing request for user: %s\n", value)
}

func main() {
    ctx := context.WithValue(context.Background(), requestKey("userID"), "12345")
    go processRequest(ctx)
    // 防止主函数过早退出
    select {}
}

在上述代码中,通过 context.WithValue 创建了一个新的 Context,并将 userID 作为值与 requestKey("userID") 关联。processRequest 函数通过 ctx.Value 获取这个值,并进行相应的处理。

Context 使用注意事项

正确传递 Context

在函数调用链中,要确保 Context 被正确传递。如果某个函数需要 Context,应该将其作为参数显式传递,而不是在函数内部创建新的 Context。例如:

package main

import (
    "context"
    "fmt"
    "time"
)

func innerTask(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Inner task cancelled:", ctx.Err())
        return
    case <-time.After(2 * time.Second):
        fmt.Println("Inner task completed")
    }
}

func outerTask(ctx context.Context) {
    fmt.Println("Outer task started")
    innerTask(ctx)
    fmt.Println("Outer task completed")
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    go outerTask(ctx)
    time.Sleep(4 * time.Second)
    fmt.Println("Main function exiting")
}

在这个例子中,outerTask 函数将 ctx 传递给 innerTask 函数,这样当 ctx 被取消时,innerTask 也能收到取消信号。

避免在全局变量中使用 Context

Context 作为全局变量使用可能会导致难以追踪其生命周期和取消逻辑。Context 应该与具体的请求或任务相关联,并在函数调用中传递。例如,不要这样做:

package main

import (
    "context"
    "fmt"
    "time"
)

var globalCtx context.Context

func init() {
    globalCtx, _ = context.WithTimeout(context.Background(), 5*time.Second)
}

func task() {
    select {
    case <-globalCtx.Done():
        fmt.Println("Task cancelled:", globalCtx.Err())
        return
    case <-time.After(3 * time.Second):
        fmt.Println("Task completed")
    }
}

func main() {
    go task()
    time.Sleep(6 * time.Second)
    fmt.Println("Main function exiting")
}

在上述代码中,globalCtx 作为全局变量,难以确定其在不同场景下的取消逻辑。更好的做法是将 Context 作为参数传递给 task 函数。

注意 Context 的嵌套

当创建多个嵌套的 Context 时,要注意取消逻辑。外层 Context 的取消会级联取消内层 Context,但内层 Context 的取消不一定会影响外层 Context。例如:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    outerCtx, outerCancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer outerCancel()
    innerCtx, innerCancel := context.WithTimeout(outerCtx, 3*time.Second)
    defer innerCancel()

    go func() {
        select {
        case <-innerCtx.Done():
            fmt.Println("Inner context cancelled:", innerCtx.Err())
        case <-time.After(4 * time.Second):
            fmt.Println("Inner task completed")
        }
    }()

    time.Sleep(4 * time.Second)
    fmt.Println("Main function exiting")
}

在这个例子中,innerCtx 基于 outerCtx 创建,且设置了更短的超时时间。当 innerCtx 超时取消时,不会影响 outerCtx。但如果 outerCtx 先取消,innerCtx 也会收到取消信号。

通过合理使用 context,Go语言开发者可以更加优雅、安全地管理并发任务,避免常见的并发问题,提高程序的健壮性和可维护性。在实际项目中,深入理解并熟练运用 context 的各种特性,对于编写高效、可靠的并发程序至关重要。无论是简单的Goroutine取消,还是复杂的任务链管理和值传递,context 都提供了强大而灵活的解决方案。