MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go并发范式在不同业务的适配

2021-11-303.3k 阅读

Go并发编程基础

Goroutine

在Go语言中,goroutine是实现并发的核心机制。它类似于线程,但比线程更轻量级。一个程序可以轻松创建成千上万的goroutine,而不会消耗过多资源。

以下是一个简单的goroutine示例:

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 3; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

在上述代码中,go say("world")启动了一个新的goroutine来执行say("world")函数,而say("hello")则在主goroutine中执行。这两个goroutine是并发执行的。

Channel

channel是Go语言中用于在goroutine之间进行通信的机制。它可以看作是一个类型化的管道,数据可以通过它在goroutine之间传递。

以下是一个简单的channel示例:

package main

import (
    "fmt"
)

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c

    fmt.Println(x, y, x+y)
}

在这个例子中,我们创建了一个channel c。然后启动两个goroutine,分别计算切片 s的前半部分和后半部分的和,并将结果通过channel发送。主goroutinechannel中接收这两个结果并计算总和。

不同业务场景下的并发范式适配

数据处理业务

在数据处理业务中,常常需要对大量数据进行并行处理以提高效率。例如,对一个包含数百万条记录的日志文件进行分析,统计特定事件的发生次数。

基于任务分发的并发范式

这种范式适用于可以将数据处理任务拆分成多个独立子任务的场景。

package main

import (
    "fmt"
    "sync"
)

type Task struct {
    data []int
}

func worker(tasks <-chan Task, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range tasks {
        sum := 0
        for _, num := range task.data {
            sum += num
        }
        results <- sum
    }
}

func main() {
    const numWorkers = 3
    tasks := make(chan Task, 10)
    results := make(chan int, 10)
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(tasks, results, &wg)
    }

    // 模拟任务分发
    for i := 0; i < 5; i++ {
        task := Task{data: []int{i, i + 1, i + 2}}
        tasks <- task
    }
    close(tasks)

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println("Result:", result)
    }
}

在上述代码中,我们创建了多个worker goroutine,它们从tasks channel中获取任务,处理后将结果发送到results channel。主goroutine负责分发任务,最后收集并打印结果。

流水线并发范式

流水线范式适用于数据处理可以分为多个阶段,且每个阶段可以独立并行执行的场景。例如,数据清洗、特征提取、模型预测这样的机器学习流程。

package main

import (
    "fmt"
    "sync"
)

func stage1(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for num := range in {
        out <- num * 2
    }
    close(out)
}

func stage2(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for num := range in {
        out <- num + 1
    }
    close(out)
}

func main() {
    data := []int{1, 2, 3, 4, 5}
    var wg sync.WaitGroup

    stage1In := make(chan int)
    stage1Out := make(chan int)
    stage2Out := make(chan int)

    wg.Add(1)
    go stage1(stage1In, stage1Out, &wg)

    wg.Add(1)
    go stage2(stage1Out, stage2Out, &wg)

    for _, num := range data {
        stage1In <- num
    }
    close(stage1In)

    go func() {
        wg.Wait()
        close(stage2Out)
    }()

    for result := range stage2Out {
        fmt.Println("Final Result:", result)
    }
}

在这个例子中,我们有两个处理阶段stage1stage2stage1将输入数据翻倍,stage2再将stage1的输出加1。数据在这两个阶段组成的流水线中依次处理。

网络服务业务

在网络服务业务中,如Web服务器、RPC服务器等,需要高效处理大量并发请求。

基于请求-响应模型的并发范式

对于Web服务器,每一个HTTP请求可以看作是一个独立的任务。

package main

import (
    "fmt"
    "net/http"
)

func handler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello, you've requested: %s\n", r.URL.Path)
}

func main() {
    http.HandleFunc("/", handler)
    fmt.Println("Server is listening on :8080")
    http.ListenAndServe(":8080", nil)
}

在上述简单的HTTP服务器示例中,Go语言的http包内部使用goroutine来处理每个HTTP请求。当一个请求到达时,http包会启动一个新的goroutine来执行handler函数,从而实现并发处理多个请求。

基于连接复用的并发范式

在RPC服务中,连接的建立和销毁可能开销较大,因此连接复用可以提高性能。

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "log"
    "net"

    pb "github.com/yourpackage/yourproto"
)

type Server struct {
    pb.UnimplementedYourServiceServer
}

func (s *Server) YourMethod(ctx context.Context, in *pb.Request) (*pb.Response, error) {
    return &pb.Response{Message: "Hello from server"}, nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterYourServiceServer(s, &Server{})

    fmt.Println("Server is listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

在这个简单的gRPC服务器示例中,grpc.NewServer()创建的服务器会使用goroutine来并发处理每个RPC请求。同时,gRPC在底层会复用TCP连接,减少连接建立和销毁的开销,提高并发处理能力。

分布式系统业务

在分布式系统业务中,需要处理多个节点之间的通信和协同工作。

基于分布式消息队列的并发范式

分布式消息队列如Kafka可以用于在不同节点之间传递消息,实现解耦和异步处理。

package main

import (
    "fmt"
    "github.com/segmentio/kafka-go"
)

func producer(topic string) {
    w := &kafka.Writer{
        Addr:  kafka.TCP("localhost:9092"),
        Topic: topic,
    }
    defer w.Close()

    for i := 0; i < 10; i++ {
        err := w.WriteMessages(kafka.Message{
            Value: []byte(fmt.Sprintf("message %d", i)),
        })
        if err != nil {
            fmt.Println("failed to write message:", err)
        }
    }
}

func consumer(topic string) {
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"localhost:9092"},
        Topic:    topic,
        MaxBytes: 10e6,
    })
    defer r.Close()

    for {
        m, err := r.ReadMessage(nil)
        if err != nil {
            fmt.Println("failed to read message:", err)
            break
        }
        fmt.Println("message at offset", m.Offset, string(m.Value))
    }
}

func main() {
    topic := "test-topic"

    go producer(topic)
    go consumer(topic)

    select {}
}

在上述代码中,我们使用kafka-go库实现了一个简单的Kafka生产者和消费者。生产者向Kafka主题发送消息,消费者从主题中读取消息。不同节点上的生产者和消费者可以通过Kafka进行异步通信,实现分布式系统中的并发处理。

基于分布式锁的并发范式

在分布式系统中,有时需要保证某些操作在全局范围内的唯一性,例如分布式系统中的资源分配。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

var ctx = context.Background()

func acquireLock(client *redis.Client, lockKey string, lockValue string, expiration time.Duration) bool {
    set, err := client.SetNX(ctx, lockKey, lockValue, expiration).Result()
    if err != nil {
        fmt.Println("Error acquiring lock:", err)
        return false
    }
    return set
}

func releaseLock(client *redis.Client, lockKey string, lockValue string) {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    _, err := client.Eval(ctx, script, []string{lockKey}, lockValue).Int64()
    if err != nil {
        fmt.Println("Error releasing lock:", err)
    }
}

func main() {
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })

    lockKey := "resource-lock"
    lockValue := "unique-value"
    expiration := 10 * time.Second

    if acquireLock(client, lockKey, lockValue, expiration) {
        fmt.Println("Lock acquired, performing critical operation...")
        // 执行关键操作
        time.Sleep(5 * time.Second)
        releaseLock(client, lockKey, lockValue)
        fmt.Println("Lock released")
    } else {
        fmt.Println("Failed to acquire lock")
    }
}

在这个基于Redis实现的分布式锁示例中,不同节点通过竞争获取Redis中的锁来执行关键操作。只有获取到锁的节点才能执行,从而保证了操作在分布式系统中的唯一性和并发控制。

实时数据处理业务

在实时数据处理业务中,如实时监控、实时数据分析等,需要及时处理源源不断的数据流。

基于发布-订阅模型的并发范式

发布-订阅模型可以让多个订阅者同时接收实时数据。

package main

import (
    "fmt"
    "sync"
)

type Publisher struct {
    subscribers map[string]chan string
    mu          sync.RWMutex
}

func NewPublisher() *Publisher {
    return &Publisher{
        subscribers: make(map[string]chan string),
    }
}

func (p *Publisher) Subscribe(subscriberID string) chan string {
    p.mu.Lock()
    defer p.mu.Unlock()
    ch := make(chan string)
    p.subscribers[subscriberID] = ch
    return ch
}

func (p *Publisher) Unsubscribe(subscriberID string) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if ch, ok := p.subscribers[subscriberID]; ok {
        close(ch)
        delete(p.subscribers, subscriberID)
    }
}

func (p *Publisher) Publish(message string) {
    p.mu.RLock()
    defer p.mu.RUnlock()
    for _, ch := range p.subscribers {
        ch <- message
    }
}

func main() {
    pub := NewPublisher()

    sub1 := pub.Subscribe("sub1")
    sub2 := pub.Subscribe("sub2")

    go func() {
        for msg := range sub1 {
            fmt.Println("Subscriber 1 received:", msg)
        }
    }()

    go func() {
        for msg := range sub2 {
            fmt.Println("Subscriber 2 received:", msg)
        }
    }()

    pub.Publish("Hello, subscribers!")

    pub.Unsubscribe("sub1")

    pub.Publish("This is for sub2 only")

    time.Sleep(1 * time.Second)
}

在上述代码中,Publisher负责发布消息,多个Subscriber通过订阅Publisher来接收消息。当Publisher发布消息时,所有订阅者的channel都会收到该消息,实现了实时数据的并发处理。

基于流处理的并发范式

流处理适用于对连续数据流进行实时处理的场景,例如实时监控系统中对传感器数据的处理。

package main

import (
    "fmt"
    "time"
)

func streamProcessor(data <-chan int) {
    for num := range data {
        result := num * num
        fmt.Println("Processed result:", result)
    }
}

func main() {
    dataStream := make(chan int)

    go streamProcessor(dataStream)

    for i := 1; i <= 5; i++ {
        dataStream <- i
        time.Sleep(1 * time.Second)
    }
    close(dataStream)

    time.Sleep(2 * time.Second)
}

在这个简单的流处理示例中,dataStream模拟了一个数据流,streamProcessor goroutine不断从数据流中读取数据并进行处理。通过这种方式,可以实时处理源源不断的数据。

并发范式选择与优化

选择合适的并发范式

  1. 根据业务需求:如果业务是数据处理且可以拆分成独立任务,任务分发范式可能更合适;如果是网络服务,请求 - 响应模型可能是首选。
  2. 考虑数据依赖:流水线范式适用于数据处理有明显阶段且前一阶段输出是后一阶段输入的情况;而发布 - 订阅模型适用于数据无直接依赖,多个组件需要同时获取数据的场景。
  3. 资源限制:如果资源有限,如内存、CPU等,需要选择更轻量级的并发范式。例如,在分布式系统中,基于分布式锁的范式可能比基于分布式消息队列的范式对资源要求更低。

并发优化

  1. 减少锁争用:在涉及共享资源的并发操作中,尽量减少锁的使用范围和时间。例如,可以使用无锁数据结构(如sync.Map在Go 1.9及以上版本中)来提高并发性能。
  2. 优化channel使用:合理设置channel的缓冲区大小,避免不必要的阻塞。如果channel作为同步机制,无缓冲channel可能更合适;如果用于数据传输,适当大小的缓冲channel可以提高性能。
  3. 负载均衡:在任务分发场景中,确保任务在goroutine之间均匀分配,避免某个goroutine负载过重。可以采用动态任务分配策略,根据goroutine的当前负载情况分配任务。

通过合理选择并发范式并进行优化,可以充分发挥Go语言的并发优势,提高不同业务场景下的系统性能和效率。在实际应用中,需要根据具体业务需求、数据特征和资源情况进行综合考虑和调整。