MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

go 语言中的 goroutine 实战技巧

2022-09-091.5k 阅读

理解 goroutine 基础

在 Go 语言中,goroutine 是实现并发编程的核心机制。它是一种轻量级的线程,由 Go 运行时(runtime)管理调度。与传统线程相比,goroutine 的创建和销毁开销极小,这使得我们可以轻松创建数以万计的 goroutine 来处理并发任务。

简单的 goroutine 示例

以下是一个简单的示例,展示如何创建和运行一个 goroutine:

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello, goroutine!")
}

func main() {
    go sayHello()
    time.Sleep(1 * time.Second)
    fmt.Println("Main function is done.")
}

在上述代码中,go sayHello() 语句创建并启动了一个新的 goroutine 来执行 sayHello 函数。主函数 main 并不会等待 sayHello 函数执行完毕,而是继续向下执行。由于 sayHello 函数在新的 goroutine 中异步执行,我们需要在主函数中添加 time.Sleep 来确保 sayHello 函数有足够的时间执行。如果不添加 time.Sleep,主函数可能在 sayHello 函数执行之前就结束了,导致 sayHello 函数中的打印语句无法输出。

goroutine 与并发模式

生产者 - 消费者模式

生产者 - 消费者模式是一种经典的并发模式,在 Go 语言中利用 goroutine 和通道(channel)可以非常优雅地实现。

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Printf("Produced: %d\n", i)
    }
    close(ch)
}

func consumer(ch chan int) {
    for val := range ch {
        fmt.Printf("Consumed: %d\n", val)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch)
    go consumer(ch)

    select {}
}

在这个示例中,producer 函数是生产者,它向通道 ch 发送数据。consumer 函数是消费者,它从通道 ch 接收数据。producer 函数在发送完数据后关闭通道,consumer 函数通过 for... range 循环从通道接收数据,当通道关闭时,循环会自动结束。select {} 语句在 main 函数中,用于阻塞主函数,防止主函数提前退出,确保生产者和消费者 goroutine 有足够的时间完成任务。

扇入(Fan - In)模式

扇入模式是指将多个输入源的数据合并到一个输出通道。例如,假设有多个 goroutine 同时生成数据,我们希望将这些数据收集到一个通道中进行统一处理。

package main

import (
    "fmt"
)

func generateData(id int, ch chan int) {
    for i := id * 10; i < (id + 1) * 10; i++ {
        ch <- i
    }
    close(ch)
}

func fanIn(inputs []<-chan int, output chan<- int) {
    var done = make(chan struct{})
    defer close(done)

    for _, in := range inputs {
        go func(c <-chan int) {
            for val := range c {
                select {
                case output <- val:
                case <-done:
                    return
                }
            }
        }(in)
    }
}

func main() {
    var inputs []<-chan int
    for i := 0; i < 3; i++ {
        ch := make(chan int)
        go generateData(i, ch)
        inputs = append(inputs, ch)
    }

    output := make(chan int)
    go fanIn(inputs, output)

    for val := range output {
        fmt.Println(val)
    }
}

在上述代码中,generateData 函数模拟数据生成,每个 generateData 函数在不同的 goroutine 中运行,生成不同范围的数据并发送到各自的通道。fanIn 函数负责将多个输入通道的数据合并到一个输出通道。fanIn 函数使用了一个 done 通道来控制 goroutine 的退出,以确保在主函数结束时所有 goroutine 能正确退出。

扇出(Fan - Out)模式

扇出模式与扇入模式相反,它将一个输入源的数据分发给多个处理单元。例如,我们有一个通道接收数据,希望将这些数据分发给多个 goroutine 进行并行处理。

package main

import (
    "fmt"
)

func worker(id int, input <-chan int) {
    for val := range input {
        fmt.Printf("Worker %d processed: %d\n", id, val)
    }
}

func fanOut(input <-chan int, numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        go worker(i, input)
    }
}

func main() {
    input := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            input <- i
        }
        close(input)
    }()

    fanOut(input, 3)

    select {}
}

在这个示例中,worker 函数是处理数据的单元,fanOut 函数创建多个 worker goroutine 并将输入通道的数据分发给它们。主函数中,先向输入通道发送数据,然后调用 fanOut 函数启动多个工作 goroutine 来处理数据。同样,select {} 用于阻塞主函数,确保工作 goroutine 有足够时间处理完数据。

goroutine 的调度与资源管理

goroutine 调度器原理

Go 语言的运行时包含一个高效的 goroutine 调度器。调度器采用 M:N 调度模型,即多个 goroutine 映射到多个操作系统线程上。调度器主要由三个组件组成:M(操作系统线程)、G(goroutine)和 P(处理器)。

M:代表操作系统线程,它负责执行 goroutine 的代码。一个 M 可以运行多个 G,但在同一时刻只能运行一个 G。

G:代表 goroutine,它包含了 goroutine 的代码、栈以及其他运行时信息。

P:代表处理器,它是一个资源,用于绑定 M 和 G。P 维护了一个本地的 G 队列,M 从 P 的本地队列或全局队列中获取 G 来执行。

调度器的工作流程大致如下:

  1. 当一个 goroutine 被创建时,它被放入全局队列或某个 P 的本地队列中。
  2. M 从 P 的本地队列中获取 G 来执行,如果本地队列为空,则尝试从全局队列或其他 P 的本地队列中窃取 G。
  3. 当一个 G 执行系统调用或进入阻塞状态时,M 会将 G 从运行状态切换到阻塞状态,并从队列中获取另一个 G 来执行。当 G 阻塞完成后,它会被重新放入队列中等待执行。

避免 goroutine 泄漏

在编写并发程序时,很容易出现 goroutine 泄漏的问题。goroutine 泄漏是指 goroutine 持续运行,但不再有任何方式可以与之交互或终止它,从而浪费系统资源。

例如,以下代码就存在 goroutine 泄漏的风险:

package main

import (
    "fmt"
)

func riskyFunction() {
    ch := make(chan int)
    go func() {
        for {
            select {
            case val := <-ch:
                fmt.Println(val)
            }
        }
    }()
    // 没有关闭通道 ch,导致 goroutine 泄漏
}

func main() {
    riskyFunction()
}

在上述代码中,riskyFunction 函数创建了一个 goroutine 来从通道 ch 接收数据,但没有关闭通道 ch。这意味着这个 goroutine 会一直阻塞在 select 语句中,无法终止,从而导致 goroutine 泄漏。

为了避免 goroutine 泄漏,我们需要确保在合适的时机关闭通道或提供一种方式来终止 goroutine。例如:

package main

import (
    "fmt"
    "time"
)

func safeFunction() {
    ch := make(chan int)
    go func() {
        for {
            select {
            case val, ok := <-ch:
                if!ok {
                    return
                }
                fmt.Println(val)
            }
        }
    }()

    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
    time.Sleep(1 * time.Second)
}

func main() {
    safeFunction()
}

在这个改进后的代码中,safeFunction 函数在向通道发送完数据后关闭了通道。在 goroutine 中,通过检查 ok 标志来判断通道是否关闭,当通道关闭时,okfalse,从而可以安全地退出 goroutine,避免了 goroutine 泄漏。

使用 sync 包控制 goroutine 同步

sync.Mutex 互斥锁

在并发编程中,多个 goroutine 可能同时访问共享资源,这可能导致数据竞争和不一致的问题。sync.Mutex 是 Go 语言提供的一种简单有效的同步机制,用于保护共享资源,确保同一时刻只有一个 goroutine 可以访问共享资源。

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mutex   sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mutex.Lock()
    counter++
    mutex.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在上述代码中,counter 是共享资源,mutex 是用于保护 counter 的互斥锁。在 increment 函数中,通过调用 mutex.Lock() 来获取锁,确保在修改 counter 时不会有其他 goroutine同时访问。修改完成后,调用 mutex.Unlock() 释放锁。sync.WaitGroup 用于等待所有 goroutine 完成任务,确保在输出 counter 的最终值时,所有的递增操作都已完成。

sync.RWMutex 读写锁

在某些情况下,共享资源的读取操作远远多于写入操作。如果每次读取操作都使用互斥锁,会降低程序的并发性能。sync.RWMutex 是一种读写锁,允许多个 goroutine 同时进行读取操作,但只允许一个 goroutine 进行写入操作。

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data    = make(map[string]string)
    rwMutex sync.RWMutex
)

func read(key string) string {
    rwMutex.RLock()
    value := data[key]
    rwMutex.RUnlock()
    return value
}

func write(key, value string) {
    rwMutex.Lock()
    data[key] = value
    rwMutex.Unlock()
}

func main() {
    go func() {
        for i := 0; i < 10; i++ {
            write(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i))
            time.Sleep(100 * time.Millisecond)
        }
    }()

    var wg sync.WaitGroup
    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", id%10)
            fmt.Printf("Goroutine %d read: %s\n", id, read(key))
        }(i)
    }
    wg.Wait()
}

在这个示例中,data 是共享的 map,rwMutex 是读写锁。read 函数使用 rwMutex.RLock() 进行读锁定,允许多个 goroutine 同时读取。write 函数使用 rwMutex.Lock() 进行写锁定,确保在写入时没有其他 goroutine 进行读写操作。通过这种方式,既保证了数据的一致性,又提高了并发读取的性能。

sync.Cond 条件变量

sync.Cond 用于协调多个 goroutine 在共享资源状态变化时的行为。它通常与 sync.Mutex 配合使用。

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    mu    sync.Mutex
    cond  sync.Cond
    ready bool
)

func waiter() {
    mu.Lock()
    for!ready {
        cond.Wait()
    }
    fmt.Println("Waiter is notified and ready to proceed.")
    mu.Unlock()
}

func notifier() {
    time.Sleep(2 * time.Second)
    mu.Lock()
    ready = true
    fmt.Println("Notifier is setting ready to true and notifying.")
    cond.Broadcast()
    mu.Unlock()
}

func main() {
    cond.L = &mu
    go waiter()
    go notifier()

    select {}
}

在上述代码中,waiter 函数在 readyfalse 时调用 cond.Wait() 进入等待状态,并释放 mu 锁。notifier 函数在等待 2 秒后,设置 readytrue,然后调用 cond.Broadcast() 通知所有等待的 goroutine。waiter 函数在收到通知后重新获取 mu 锁,并检查 ready 状态,满足条件后继续执行。

使用 context 控制 goroutine 生命周期

在实际应用中,我们经常需要在外部控制 goroutine 的执行,例如在程序退出时取消正在运行的 goroutine,或者设置 goroutine 的执行超时。Go 语言的 context 包提供了一种优雅的方式来实现这些功能。

取消 goroutine

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Task cancelled.")
            return
        default:
            fmt.Println("Task is running...")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go longRunningTask(ctx)

    time.Sleep(3 * time.Second)
    cancel()

    select {}
}

在上述代码中,context.WithCancel 创建了一个可取消的上下文 ctx 和取消函数 cancellongRunningTask 函数通过 select 语句监听 ctx.Done() 通道,当该通道接收到数据时,说明上下文被取消,任务应立即结束。在 main 函数中,3 秒后调用 cancel 函数取消上下文,从而终止 longRunningTask goroutine。

设置超时

package main

import (
    "context"
    "fmt"
    "time"
)

func taskWithTimeout(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("Task timed out.")
        return
    case <-time.After(5 * time.Second):
        fmt.Println("Task completed within timeout.")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    go taskWithTimeout(ctx)

    select {}
}

在这个示例中,context.WithTimeout 创建了一个带有超时的上下文 ctx,超时时间为 3 秒。taskWithTimeout 函数通过 select 语句监听 ctx.Done() 通道和 time.After 通道。如果 ctx.Done() 通道先接收到数据,说明任务超时;如果 time.After 通道先接收到数据,说明任务在超时前完成。

goroutine 性能优化

减少锁的竞争

锁的竞争会严重影响程序的并发性能。为了减少锁的竞争,可以采用以下几种方法:

  1. 减小锁的粒度:尽量缩小锁保护的代码范围,只在真正需要保护共享资源的地方加锁。
  2. 使用读写锁:如前面提到的 sync.RWMutex,在读取操作多的情况下,使用读写锁可以提高并发性能。
  3. 分段锁:将共享资源分成多个部分,每个部分使用独立的锁进行保护。这样不同的 goroutine 可以同时访问不同部分的资源,减少锁的竞争。

合理使用 goroutine 数量

虽然 goroutine 是轻量级的,但创建过多的 goroutine 也会带来性能问题。过多的 goroutine 会增加调度器的负担,导致频繁的上下文切换,降低程序的整体性能。在实际应用中,需要根据系统资源和任务特点合理设置 goroutine 的数量。例如,可以使用 sync.WaitGroup 和通道来限制并发执行的 goroutine 数量。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, job)
        result := job * job
        fmt.Printf("Worker %d finished job %d with result %d\n", id, job, result)
        results <- result
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    for i := 0; i < numJobs; i++ {
        jobs <- i
    }
    close(jobs)

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println("Result:", result)
    }
}

在上述代码中,通过设置 numWorkers 来控制同时执行的 goroutine 数量,避免创建过多的 goroutine。

优化内存分配

在 goroutine 中频繁进行内存分配也会影响性能。可以通过以下方法优化内存分配:

  1. 复用对象:尽量复用已有的对象,减少新对象的创建。例如,使用对象池(如 sync.Pool)来缓存和复用临时对象。
  2. 减少不必要的中间变量:在编写代码时,尽量减少中间变量的使用,避免不必要的内存分配。

错误处理与 goroutine

在并发编程中,错误处理尤为重要。由于 goroutine 是异步执行的,错误处理需要特别小心。

在 goroutine 中返回错误

package main

import (
    "fmt"
    "errors"
)

func divide(a, b int) (int, error) {
    if b == 0 {
        return 0, errors.New("division by zero")
    }
    return a / b, nil
}

func main() {
    resultCh := make(chan int, 1)
    errCh := make(chan error, 1)

    go func() {
        result, err := divide(10, 0)
        if err != nil {
            errCh <- err
            return
        }
        resultCh <- result
    }()

    select {
    case result := <-resultCh:
        fmt.Println("Result:", result)
    case err := <-errCh:
        fmt.Println("Error:", err)
    }
}

在上述代码中,divide 函数可能返回错误。在 goroutine 中调用 divide 函数,如果发生错误,将错误发送到 errCh 通道;如果没有错误,将结果发送到 resultCh 通道。在 main 函数中,通过 select 语句监听这两个通道,根据接收到的数据进行相应的处理。

处理多个 goroutine 的错误

当有多个 goroutine 同时执行并可能返回错误时,我们需要一种机制来收集和处理这些错误。

package main

import (
    "fmt"
    "errors"
    "sync"
)

func worker(id int) (int, error) {
    if id == 2 {
        return 0, errors.New("worker 2 failed")
    }
    return id * 10, nil
}

func main() {
    const numWorkers = 3
    var wg sync.WaitGroup
    results := make([]int, numWorkers)
    var errCount int
    errMutex := sync.Mutex{}

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result, err := worker(id)
            if err != nil {
                errMutex.Lock()
                errCount++
                errMutex.Unlock()
                return
            }
            results[id] = result
        }(i)
    }

    wg.Wait()

    if errCount > 0 {
        fmt.Println("Some workers failed.")
    } else {
        fmt.Println("Results:", results)
    }
}

在这个示例中,worker 函数可能返回错误。多个 worker goroutine 同时执行,通过 sync.WaitGroup 等待所有 goroutine 完成。使用 errMutex 来保护 errCount 的并发访问,统计发生错误的 goroutine 数量。最后根据 errCount 的值判断是否有 goroutine 失败,并进行相应的处理。

通过以上对 goroutine 的实战技巧介绍,包括并发模式、调度原理、同步控制、生命周期管理、性能优化以及错误处理等方面,希望能帮助开发者在 Go 语言的并发编程中更加得心应手,编写出高效、稳定的并发程序。