MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go使用context管理并发控制的优雅实现

2024-06-092.9k 阅读

一、Go语言并发编程概述

1.1 Go语言并发模型

Go语言以其轻量级的并发模型而闻名,它通过goroutine来实现并发执行。goroutine类似于线程,但比线程更轻量级,创建和销毁的开销极小。在Go程序中,可以轻松地启动数以万计的goroutine。

例如,下面这个简单的示例展示了如何启动一个goroutine:

package main

import (
    "fmt"
)

func sayHello() {
    fmt.Println("Hello, from goroutine!")
}

func main() {
    go sayHello()
    fmt.Println("Main function")
}

在上述代码中,go sayHello() 语句启动了一个新的goroutine来执行 sayHello 函数。主函数继续执行并打印 “Main function”,而goroutine在后台执行并打印 “Hello, from goroutine!”。

1.2 并发控制的挑战

随着并发任务数量的增加,控制并发变得复杂起来。例如,当一个程序启动多个goroutine进行数据处理时,可能会遇到以下问题:

  1. 取消任务:当某个条件满足时,需要能够取消正在运行的goroutine。比如在一个HTTP请求处理中,客户端可能提前断开连接,此时相关的后台任务应该被取消。
  2. 设置超时:某些任务可能不能无限制地运行,需要设置一个超时时间。如果在规定时间内任务未完成,应该终止任务并返回相应的错误。
  3. 传递请求范围的上下文:在多个goroutine协作处理一个请求时,可能需要传递一些与请求相关的上下文信息,如请求ID、认证信息等。

传统的并发控制方法,如使用通道(channel)和共享变量加锁,虽然可以解决部分问题,但对于复杂的并发场景,代码会变得冗长且难以维护。

二、Context 简介

2.1 Context是什么

Context(上下文)是Go 1.7 引入的一个包,用于在多个goroutine之间传递请求范围的上下文信息,包括取消信号、超时时间等。它为处理并发控制提供了一种简洁而优雅的方式。

Context是一个接口类型,定义如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
  1. Deadline方法:返回当前Context的截止时间。如果截止时间已过,okfalse
  2. Done方法:返回一个只读通道,当Context被取消或超时的时候,这个通道会被关闭。
  3. Err方法:返回Context被取消的原因。如果Context未被取消,返回 nil
  4. Value方法:从Context中获取与给定键关联的值。

2.2 Context的类型

  1. Backgroundcontext.Background 是所有Context的根,通常用于主函数、初始化和测试代码中。它不会被取消,没有截止时间,也没有携带任何值。
  2. TODOcontext.TODO 用于在不确定应该使用哪种Context时暂时替代。它的语义和 Background 类似,但主要用于表明代码需要进一步重构以使用合适的Context。
  3. WithCancelcontext.WithCancel 用于创建一个可以取消的Context。它接受一个父Context,并返回一个新的Context和取消函数 CancelFunc。调用取消函数会关闭新Context的 Done 通道。
  4. WithDeadlinecontext.WithDeadline 用于创建一个带有截止时间的Context。它接受一个父Context和截止时间 deadline,返回新的Context和取消函数。当截止时间到达或者调用取消函数时,新Context会被取消。
  5. WithTimeoutcontext.WithTimeoutWithDeadline 的便捷版本,它接受一个父Context和超时时间 timeout,会自动计算截止时间并创建相应的Context和取消函数。
  6. WithValuecontext.WithValue 用于创建一个携带值的Context。它接受一个父Context和键值对,返回新的Context。可以通过 Value 方法从新Context中获取携带的值。

三、使用Context进行并发控制

3.1 取消任务

3.1.1 基本取消示例

下面的代码展示了如何使用 context.WithCancel 来取消一个goroutine:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker stopped")
            return
        default:
            fmt.Println("Worker is working")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go worker(ctx)

    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(1 * time.Second)
}

在这个例子中,worker 函数在一个无限循环中工作,通过 select 语句监听 ctx.Done() 通道。当 cancel 函数被调用时,ctx.Done() 通道关闭,worker 函数收到信号并停止工作。

3.1.2 多层级取消

在实际应用中,可能会有多个goroutine形成层级关系,一个父goroutine启动多个子goroutine,并且父goroutine取消时,所有子goroutine也应该取消。

package main

import (
    "context"
    "fmt"
    "time"
)

func childWorker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Child worker %d stopped\n", id)
            return
        default:
            fmt.Printf("Child worker %d is working\n", id)
            time.Sleep(1 * time.Second)
        }
    }
}

func parentWorker(ctx context.Context) {
    ctx1, cancel1 := context.WithCancel(ctx)
    ctx2, cancel2 := context.WithCancel(ctx)

    go childWorker(ctx1, 1)
    go childWorker(ctx2, 2)

    time.Sleep(3 * time.Second)
    cancel1()
    cancel2()
    time.Sleep(1 * time.Second)
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go parentWorker(ctx)

    time.Sleep(5 * time.Second)
    cancel()
    time.Sleep(1 * time.Second)
}

在上述代码中,parentWorker 启动了两个 childWorker,每个 childWorker 都使用从父Context派生的新Context。当 parentWorker 中的取消函数被调用时,相应的子goroutine会收到取消信号并停止。而主函数中的取消函数被调用时,parentWorker 及其所有子goroutine都会停止。

3.2 设置超时

3.2.1 使用WithTimeout

context.WithTimeout 提供了一种简单的方式来为任务设置超时。

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Task timed out or cancelled")
        return
    case <-time.After(5 * time.Second):
        fmt.Println("Task completed")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    go longRunningTask(ctx)

    time.Sleep(4 * time.Second)
}

在这个例子中,longRunningTask 函数通过 select 语句监听 ctx.Done() 通道和一个5秒的定时器。由于我们设置的超时时间是3秒,当3秒后 ctx.Done() 通道关闭,longRunningTask 函数收到信号并停止,打印 “Task timed out or cancelled”。

3.2.2 处理超时错误

在实际应用中,通常需要根据超时情况返回相应的错误。

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) error {
    select {
    case <-ctx.Done():
        if ctx.Err() == context.DeadlineExceeded {
            return fmt.Errorf("task timed out")
        }
        return fmt.Errorf("task cancelled")
    case <-time.After(5 * time.Second):
        return nil
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    err := longRunningTask(ctx)
    if err != nil {
        fmt.Println(err)
    }
}

在上述代码中,longRunningTask 函数根据 ctx.Err() 判断任务是超时还是被取消,并返回相应的错误信息。主函数根据返回的错误进行处理。

3.3 传递请求范围的上下文信息

3.3.1 使用WithValue传递数据

context.WithValue 可以在多个goroutine之间传递请求相关的上下文信息。

package main

import (
    "context"
    "fmt"
)

func worker(ctx context.Context) {
    value := ctx.Value("requestID")
    if value != nil {
        fmt.Printf("Worker received requestID: %v\n", value)
    }
}

func main() {
    ctx := context.WithValue(context.Background(), "requestID", "12345")
    go worker(ctx)

    time.Sleep(1 * time.Second)
}

在这个例子中,主函数通过 context.WithValue 创建了一个携带 requestID 的Context,并传递给 worker 函数。worker 函数通过 ctx.Value 获取 requestID 并打印。

3.3.2 上下文信息的层级传递

上下文信息可以在多个层级的goroutine之间传递。

package main

import (
    "context"
    "fmt"
)

func childWorker(ctx context.Context) {
    value := ctx.Value("requestID")
    if value != nil {
        fmt.Printf("Child worker received requestID: %v\n", value)
    }
}

func parentWorker(ctx context.Context) {
    ctx = context.WithValue(ctx, "requestID", "67890")
    go childWorker(ctx)
}

func main() {
    ctx := context.WithValue(context.Background(), "requestID", "12345")
    go parentWorker(ctx)

    time.Sleep(1 * time.Second)
}

在上述代码中,主函数创建了一个携带 requestID 的Context,并传递给 parentWorkerparentWorker 又创建了一个新的Context并传递给 childWorkerchildWorker 可以获取到最终传递下来的 requestID

四、Context的最佳实践

4.1 尽早传递Context

在函数调用链中,应该尽早将Context作为参数传递,确保所有相关的goroutine都能接收到取消和超时信号。例如:

package main

import (
    "context"
    "fmt"
    "time"
)

func step1(ctx context.Context) {
    fmt.Println("Step 1 started")
    step2(ctx)
    fmt.Println("Step 1 ended")
}

func step2(ctx context.Context) {
    fmt.Println("Step 2 started")
    time.Sleep(2 * time.Second)
    fmt.Println("Step 2 ended")
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    go step1(ctx)

    time.Sleep(3 * time.Second)
}

在这个例子中,step1 函数尽早将Context传递给 step2,当主函数设置的超时时间到达时,step2 函数能够及时收到取消信号并停止工作。

4.2 不要将Context放在结构体中

Context应该作为函数参数传递,而不是放在结构体中。因为结构体可能会在不同的地方被使用,将Context放在结构体中会导致Context的生命周期管理变得复杂,并且难以保证所有使用该结构体的地方都能正确处理Context的取消和超时。

错误示例:

package main

import (
    "context"
    "fmt"
)

type MyStruct struct {
    ctx context.Context
}

func (m *MyStruct) doWork() {
    // 这里处理工作,但是无法正确处理ctx的取消等情况
    fmt.Println("Doing work")
}

func main() {
    ctx := context.Background()
    m := MyStruct{ctx: ctx}
    m.doWork()
}

正确示例:

package main

import (
    "context"
    "fmt"
)

type MyStruct struct {
    // 结构体中不包含Context
}

func (m *MyStruct) doWork(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Work cancelled")
        return
    default:
        fmt.Println("Doing work")
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    m := MyStruct{}
    go m.doWork(ctx)

    time.Sleep(1 * time.Second)
    cancel()
    time.Sleep(1 * time.Second)
}

4.3 避免在全局变量中使用Context

全局变量的生命周期与整个程序相同,而Context是用于请求范围的控制。在全局变量中使用Context可能会导致Context的取消和超时无法正确生效,并且难以理解和维护代码逻辑。

错误示例:

package main

import (
    "context"
    "fmt"
)

var globalCtx context.Context

func init() {
    globalCtx = context.Background()
}

func doWork() {
    // 这里使用全局Ctx,无法正确处理取消和超时
    fmt.Println("Doing work with global ctx")
}

func main() {
    doWork()
}

正确做法是在需要的地方创建和传递Context,以确保其正确的生命周期管理。

4.4 注意Context的内存泄漏

如果一个goroutine持有一个Context,但没有正确处理取消信号,可能会导致内存泄漏。例如,在一个使用 context.WithCancel 创建的Context中,如果没有调用取消函数,相关的资源(如文件描述符、网络连接等)可能不会被释放。

为了避免内存泄漏,确保在不需要Context时及时调用取消函数。例如,在使用 defer 语句来确保取消函数一定会被调用:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // 执行工作
    select {
    case <-ctx.Done():
        fmt.Println("Worker cancelled")
        return
    case <-time.After(5 * time.Second):
        fmt.Println("Worker completed")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    go worker(ctx)

    time.Sleep(4 * time.Second)
}

在上述代码中,worker 函数内部创建了一个新的Context,并使用 defer cancel() 确保在函数结束时取消该Context,避免潜在的内存泄漏。

五、Context与其他并发原语的结合使用

5.1 Context与Channel

Context和Channel可以很好地结合使用。例如,在一个生产者 - 消费者模型中,使用Context来控制整个流程的取消,同时使用Channel来传递数据。

package main

import (
    "context"
    "fmt"
    "time"
)

func producer(ctx context.Context, ch chan int) {
    for i := 1; i <= 10; i++ {
        select {
        case <-ctx.Done():
            close(ch)
            return
        case ch <- i:
            fmt.Printf("Produced: %d\n", i)
            time.Sleep(1 * time.Second)
        }
    }
    close(ch)
}

func consumer(ctx context.Context, ch chan int) {
    for {
        select {
        case <-ctx.Done():
            return
        case val, ok := <-ch:
            if!ok {
                return
            }
            fmt.Printf("Consumed: %d\n", val)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    ch := make(chan int)

    go producer(ctx, ch)
    go consumer(ctx, ch)

    time.Sleep(7 * time.Second)
}

在这个例子中,producer 函数根据Context的取消信号来决定是否继续生产数据并关闭Channel。consumer 函数根据Context的取消信号和Channel的关闭状态来决定是否停止消费数据。

5.2 Context与Mutex

在使用共享资源时,通常会使用Mutex(互斥锁)来保护资源。Context可以与Mutex结合使用,在Context取消时,确保正确释放锁资源。

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var mu sync.Mutex
var sharedResource int

func modifyResource(ctx context.Context) {
    mu.Lock()
    defer mu.Unlock()

    select {
    case <-ctx.Done():
        fmt.Println("Modify resource cancelled")
        return
    default:
        sharedResource++
        fmt.Printf("Modified resource: %d\n", sharedResource)
        time.Sleep(3 * time.Second)
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    go modifyResource(ctx)

    time.Sleep(2 * time.Second)
}

在上述代码中,modifyResource 函数在修改共享资源前获取Mutex锁,并通过Context来判断是否需要取消操作。当Context超时取消时,函数会正确释放锁资源。

六、Context在实际项目中的应用场景

6.1 HTTP服务器

在HTTP服务器中,Context被广泛用于处理请求的生命周期。例如,当客户端断开连接时,服务器可以通过Context取消相关的后台任务,避免资源浪费。

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

func longRunningHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    go func() {
        select {
        case <-ctx.Done():
            fmt.Println("Long running task cancelled")
            return
        case <-time.After(10 * time.Second):
            fmt.Println("Long running task completed")
        }
    }()

    fmt.Fprintf(w, "Request in progress")
}

func main() {
    http.HandleFunc("/long-running", longRunningHandler)
    fmt.Println("Server listening on :8080")
    http.ListenAndServe(":8080", nil)
}

在这个例子中,longRunningHandler 函数从HTTP请求中获取Context,并创建一个带有超时的新Context。如果请求在5秒内结束(如客户端断开连接),相关的后台任务会被取消。

6.2 数据库操作

在进行数据库操作时,Context可以用于控制操作的超时。例如,在查询数据库时,如果查询时间过长,可以通过Context取消查询。

package main

import (
    "context"
    "fmt"
    "time"

    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
    if err != nil {
        fmt.Println("Failed to connect to MongoDB:", err)
        return
    }
    defer func() {
        if err := client.Disconnect(ctx); err != nil {
            fmt.Println("Failed to disconnect from MongoDB:", err)
        }
    }()

    collection := client.Database("test").Collection("users")
    var result bson.M
    err = collection.FindOne(ctx, bson.D{}).Decode(&result)
    if err != nil {
        if err == context.DeadlineExceeded {
            fmt.Println("Database query timed out")
        } else {
            fmt.Println("Database query error:", err)
        }
        return
    }
    fmt.Println("Query result:", result)
}

在上述代码中,使用 context.WithTimeout 为数据库连接和查询操作设置超时时间。如果操作在规定时间内未完成,会根据Context的状态返回相应的错误。

6.3 微服务调用

在微服务架构中,Context可以在服务之间传递,以确保整个调用链的一致性。例如,在一个服务调用另一个服务时,可以将请求的Context传递过去,使得下游服务能够根据上游的取消信号或超时设置进行相应的处理。

// 假设这是下游服务
package main

import (
    "context"
    "fmt"
    "time"
)

func downstreamService(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Downstream service cancelled")
        return
    case <-time.After(5 * time.Second):
        fmt.Println("Downstream service completed")
    }
}

// 假设这是上游服务
package main

import (
    "context"
    "fmt"
    "time"
)

func upstreamService(ctx context.Context) {
    ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
    defer cancel()

    go downstreamService(ctx)

    time.Sleep(4 * time.Second)
}

func main() {
    ctx := context.Background()
    upstreamService(ctx)
}

在这个例子中,upstreamService 创建了一个带有超时的Context并传递给 downstreamService。当 upstreamService 的超时时间到达时,downstreamService 会收到取消信号并停止。

通过以上内容,我们详细介绍了Go语言中使用Context进行并发控制的方法、最佳实践以及在实际项目中的应用场景。Context为Go语言的并发编程提供了一种优雅且强大的方式,使得处理复杂的并发场景变得更加简单和可靠。