MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

go 中封装并发逻辑的设计模式

2022-07-041.7k 阅读

基于 Channel 的并发逻辑封装

生产者 - 消费者模式

在 Go 语言中,生产者 - 消费者模式是一种常用的并发设计模式,它通过 Channel 来解耦数据的生产和消费过程。在这个模式中,生产者负责生成数据并将其发送到 Channel 中,而消费者则从 Channel 中接收数据并进行处理。

package main

import (
    "fmt"
)

// 生产者函数,向 channel 发送数据
func producer(out chan<- int) {
    for i := 0; i < 10; i++ {
        out <- i
    }
    close(out)
}

// 消费者函数,从 channel 接收数据并处理
func consumer(in <-chan int) {
    for num := range in {
        fmt.Println("Consumed:", num)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch)
    go consumer(ch)

    select {}
}

在上述代码中,producer 函数作为生产者,通过 out chan<- int 类型的 Channel 向外界发送数据。consumer 函数作为消费者,通过 in <-chan int 类型的 Channel 从外界接收数据。main 函数中创建了一个 Channel 并启动了生产者和消费者的 goroutine。select {} 语句用于阻塞 main 函数,防止程序过早退出。

扇入(Fan - In)模式

扇入模式允许将多个输入 Channel 的数据合并到一个输出 Channel 中。这在需要同时处理多个数据源的情况下非常有用。

package main

import (
    "fmt"
)

// 将多个输入 channel 的数据合并到一个输出 channel
func fanIn(inputs []<-chan int, out chan<- int) {
    var wg sync.WaitGroup
    wg.Add(len(inputs))

    for _, in := range inputs {
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(in)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        for i := 0; i < 5; i++ {
            ch1 <- i
        }
        close(ch1)
    }()

    go func() {
        for i := 5; i < 10; i++ {
            ch2 <- i
        }
        close(ch2)
    }()

    output := make(chan int)
    go fanIn([]<-chan int{ch1, ch2}, output)

    for val := range output {
        fmt.Println(val)
    }
}

在上述代码中,fanIn 函数接收一个输入 Channel 的切片和一个输出 Channel。它为每个输入 Channel 启动一个 goroutine,将数据发送到输出 Channel 中。main 函数中创建了两个输入 Channel,并分别向它们发送数据。然后通过 fanIn 函数将这两个 Channel 的数据合并到一个输出 Channel 中,并在 main 函数中消费这个输出 Channel 的数据。

扇出(Fan - Out)模式

扇出模式是将一个输入 Channel 的数据分发到多个输出 Channel 中,常用于需要并行处理数据的场景。

package main

import (
    "fmt"
)

// 将一个输入 channel 的数据分发到多个输出 channel
func fanOut(in <-chan int, numOutputs int) []chan int {
    var outputs []chan int
    for i := 0; i < numOutputs; i++ {
        output := make(chan int)
        outputs = append(outputs, output)
        go func(out chan int) {
            for val := range in {
                out <- val
            }
            close(out)
        }(output)
    }
    return outputs
}

func main() {
    input := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            input <- i
        }
        close(input)
    }()

    numOutputs := 3
    outputs := fanOut(input, numOutputs)

    for i := 0; i < numOutputs; i++ {
        go func(output chan int) {
            for val := range output {
                fmt.Printf("Output %d: %d\n", i, val)
            }
        }(outputs[i])
    }

    select {}
}

在上述代码中,fanOut 函数接收一个输入 Channel 和输出 Channel 的数量。它为每个输出 Channel 启动一个 goroutine,将输入 Channel 的数据分发到各个输出 Channel 中。main 函数中创建了一个输入 Channel 并向其发送数据,然后通过 fanOut 函数将数据分发到多个输出 Channel 中,并为每个输出 Channel 启动一个 goroutine 来处理数据。

使用 WaitGroup 管理并发任务

简单的 WaitGroup 使用示例

WaitGroup 是 Go 语言标准库中用于等待一组 goroutine 完成的工具。它提供了一种简单的方式来同步多个并发任务。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d started\n", id)
    // 模拟一些工作
    for i := 0; i < 5; i++ {
        fmt.Printf("Worker %d: %d\n", id, i)
    }
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("All workers finished")
}

在上述代码中,worker 函数表示一个工作任务,它在开始时调用 wg.Add(1) 来增加 WaitGroup 的计数,在结束时调用 wg.Done() 来减少计数。main 函数中启动了多个 worker goroutine,并通过 wg.Wait() 等待所有 goroutine 完成。

复杂任务中的 WaitGroup 应用

在实际应用中,可能会有更复杂的任务依赖关系,WaitGroup 同样可以有效地管理这些并发任务。

package main

import (
    "fmt"
    "sync"
)

func task1(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Task 1 started")
    // 模拟任务执行
    for i := 0; i < 3; i++ {
        fmt.Println("Task 1: ", i)
    }
    fmt.Println("Task 1 finished")
}

func task2(wg *sync.WaitGroup, task1Wg *sync.WaitGroup) {
    task1Wg.Wait()
    defer wg.Done()
    fmt.Println("Task 2 started")
    // 模拟任务执行
    for i := 0; i < 2; i++ {
        fmt.Println("Task 2: ", i)
    }
    fmt.Println("Task 2 finished")
}

func main() {
    var mainWg sync.WaitGroup
    var task1Wg sync.WaitGroup

    mainWg.Add(2)
    task1Wg.Add(1)

    go task1(&task1Wg)
    go task2(&mainWg, &task1Wg)

    mainWg.Wait()
    fmt.Println("All tasks finished")
}

在这个例子中,task2 依赖于 task1 的完成。task1 启动时增加 task1Wg 的计数,完成时减少计数。task2 在开始前通过 task1Wg.Wait() 等待 task1 完成,然后增加 mainWg 的计数,完成时减少计数。main 函数通过 mainWg.Wait() 等待 task1task2 都完成。

基于 Context 的并发控制

Context 基本使用

Context 是 Go 1.7 引入的一个用于控制并发操作的重要工具,它可以用于取消操作、设置截止时间等。

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker received cancel signal")
            return
        default:
            fmt.Println("Worker is working")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    go worker(ctx)

    time.Sleep(5 * time.Second)
}

在上述代码中,worker 函数通过 select 语句监听 ctx.Done() 信号。main 函数中创建了一个带有 3 秒超时的 Context,并启动了 worker goroutine。3 秒后,Context 会自动取消,worker 函数接收到取消信号后退出。

Context 传递与取消

Context 可以在多个 goroutine 之间传递,实现级联取消。

package main

import (
    "context"
    "fmt"
    "time"
)

func subWorker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Sub - Worker %d received cancel signal\n", id)
            return
        default:
            fmt.Printf("Sub - Worker %d is working\n", id)
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    for i := 1; i <= 3; i++ {
        go subWorker(ctx, i)
    }

    time.Sleep(5 * time.Second)
}

在这个例子中,main 函数创建了一个带有超时的 Context,并启动了多个 subWorker goroutine。当 Context 取消时,所有的 subWorker 都会接收到取消信号并退出。

Context 与 Channel 结合使用

Context 常常与 Channel 一起使用,以实现更灵活的并发控制。

package main

import (
    "context"
    "fmt"
    "time"
)

func producer(ctx context.Context, out chan<- int) {
    for i := 0; ; i++ {
        select {
        case <-ctx.Done():
            close(out)
            return
        default:
            out <- i
            time.Sleep(1 * time.Second)
        }
    }
}

func consumer(ctx context.Context, in <-chan int) {
    for {
        select {
        case <-ctx.Done():
            return
        case num, ok := <-in:
            if!ok {
                return
            }
            fmt.Println("Consumed:", num)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    ch := make(chan int)

    go producer(ctx, ch)
    go consumer(ctx, ch)

    select {}
}

在上述代码中,producer 函数在 Context 取消时关闭输出 Channel,consumer 函数在 Context 取消或 Channel 关闭时退出。main 函数中创建了 Context、Channel,并启动了生产者和消费者 goroutine。

互斥锁与读写锁的应用

互斥锁(Mutex)的使用

互斥锁(Mutex)用于保护共享资源,确保在同一时间只有一个 goroutine 可以访问该资源。

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int
    mu    sync.Mutex
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func (c *Counter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    var wg sync.WaitGroup
    counter := Counter{}

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                counter.Increment()
            }
        }()
    }

    wg.Wait()
    fmt.Println("Final counter value:", counter.Get())
}

在上述代码中,Counter 结构体包含一个 int 类型的计数器和一个 MutexIncrement 方法和 Get 方法在访问共享资源 value 时,通过 LockUnlock 方法来保护资源,防止竞态条件。

读写锁(RWMutex)的应用

读写锁(RWMutex)适用于读多写少的场景,允许多个 goroutine 同时进行读操作,但只允许一个 goroutine 进行写操作。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Data struct {
    value int
    mu    sync.RWMutex
}

func (d *Data) Read() int {
    d.mu.RLock()
    defer d.mu.RUnlock()
    return d.value
}

func (d *Data) Write(newValue int) {
    d.mu.Lock()
    defer d.mu.Unlock()
    d.value = newValue
    time.Sleep(2 * time.Second)
}

func main() {
    var wg sync.WaitGroup
    data := Data{}

    // 启动多个读操作
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println("Read value:", data.Read())
        }()
    }

    // 启动写操作
    wg.Add(1)
    go func() {
        defer wg.Done()
        data.Write(42)
        fmt.Println("Write operation completed")
    }()

    wg.Wait()
}

在上述代码中,Data 结构体包含一个 int 类型的数据和一个 RWMutexRead 方法使用 RLockRUnlock 进行读操作,允许多个 goroutine 同时读取。Write 方法使用 LockUnlock 进行写操作,在写操作时会阻止其他读和写操作。

并发安全的单例模式

双重检查锁定(DCL)实现单例

在 Go 语言中实现并发安全的单例模式,可以使用双重检查锁定的方式。

package main

import (
    "fmt"
    "sync"
)

type Singleton struct {
    data string
}

var instance *Singleton
var once sync.Once

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{
            data: "Initial data",
        }
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    var instances []*Singleton

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            instance := GetInstance()
            instances = append(instances, instance)
        }()
    }

    wg.Wait()

    for _, inst := range instances {
        fmt.Println(inst == instances[0])
    }
}

在上述代码中,once 是一个 sync.Once 类型的变量,GetInstance 函数通过 once.Do 方法确保 instance 只被初始化一次。即使多个 goroutine 同时调用 GetInstance 函数,也只会创建一个 Singleton 实例。

基于 sync.Map 的单例

sync.Map 是 Go 1.9 引入的一个并发安全的键值对集合,也可以用于实现单例模式。

package main

import (
    "fmt"
    "sync"
)

type Singleton struct {
    data string
}

var singletonMap sync.Map

func GetInstance() *Singleton {
    if v, ok := singletonMap.Load("singleton"); ok {
        return v.(*Singleton)
    }

    newInstance := &Singleton{
        data: "Initial data",
    }
    singletonMap.Store("singleton", newInstance)
    return newInstance
}

func main() {
    var wg sync.WaitGroup
    var instances []*Singleton

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            instance := GetInstance()
            instances = append(instances, instance)
        }()
    }

    wg.Wait()

    for _, inst := range instances {
        fmt.Println(inst == instances[0])
    }
}

在这个例子中,singletonMap 是一个 sync.MapGetInstance 函数首先尝试从 singletonMap 中加载单例实例,如果不存在则创建一个新的实例并存储到 singletonMap 中。这种方式同样可以保证在并发环境下单例实例的唯一性。

分布式并发逻辑封装

使用 gRPC 实现分布式并发

gRPC 是一个高性能、开源和通用的 RPC 框架,基于 HTTP/2 协议标准设计,在分布式系统中常用于服务间的通信。下面是一个简单的 gRPC 示例,展示如何在分布式环境中实现并发逻辑。

  1. 定义服务和消息
syntax = "proto3";

package example;

service MyService {
    rpc Process(Request) returns (Response);
}

message Request {
    int32 value = 1;
}

message Response {
    int32 result = 1;
}
  1. 生成 Go 代码 使用 protoc 工具生成 Go 代码:
protoc --go_out=plugins=grpc:. example.proto
  1. 实现服务端
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "log"
    "net"

    pb "github.com/yourpath/example"
)

type server struct{}

func (s *server) Process(ctx context.Context, req *pb.Request) (*pb.Response, error) {
    result := req.Value * 2
    return &pb.Response{Result: result}, nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err!= nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterMyServiceServer(s, &server{})
    if err := s.Serve(lis); err!= nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
  1. 实现客户端
package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"

    pb "github.com/yourpath/example"
)

func main() {
    conn, err := grpc.Dial(":50051", grpc.WithInsecure())
    if err!= nil {
        fmt.Printf("did not connect: %v", err)
        return
    }
    defer conn.Close()
    c := pb.NewMyServiceClient(conn)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    req := &pb.Request{Value: 5}
    resp, err := c.Process(ctx, req)
    if err!= nil {
        fmt.Printf("could not greet: %v", err)
        return
    }
    fmt.Printf("Result: %d\n", resp.Result)
}

在这个例子中,服务端实现了 Process 方法,客户端通过 gRPC 调用该方法。在分布式环境中,多个客户端可以并发地调用服务端的方法,实现分布式并发处理。

使用 etcd 进行分布式协调

etcd 是一个高可用的键值存储系统,常用于分布式系统中的服务发现、配置管理和分布式锁等。下面展示如何使用 etcd 实现分布式锁,从而实现分布式并发逻辑的协调。

  1. 安装 etcd 客户端库
go get go.etcd.io/etcd/clientv3
  1. 实现分布式锁
package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "time"
)

func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err!= nil {
        fmt.Printf("failed to connect to etcd: %v", err)
        return
    }
    defer cli.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    resp, err := cli.Grant(ctx, 10)
    cancel()
    if err!= nil {
        fmt.Printf("failed to grant lease: %v", err)
        return
    }

    leaseID := resp.ID
    ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
    _, err = cli.Put(ctx, "/lock", "locked", clientv3.WithLease(leaseID))
    cancel()
    if err!= nil {
        fmt.Printf("failed to put lock: %v", err)
        return
    }

    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        _, err = cli.Revoke(ctx, leaseID)
        cancel()
        if err!= nil {
            fmt.Printf("failed to revoke lease: %v", err)
        }
    }()

    // 模拟一些工作
    fmt.Println("Lock acquired, doing some work...")
    time.Sleep(3 * time.Second)
    fmt.Println("Work done")
}

在上述代码中,通过 etcd 的租约机制实现了分布式锁。一个客户端获取租约并在 etcd 中设置一个锁键值对,如果设置成功则表示获取到锁,完成工作后撤销租约释放锁。其他客户端在获取锁时如果发现锁键值对已存在,则等待锁释放。这样就实现了分布式环境下的并发协调。

通过以上这些设计模式和工具,在 Go 语言中可以有效地封装并发逻辑,提高程序的性能和可维护性,无论是在单机环境还是分布式环境中都能很好地应对并发挑战。