MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

go 并发编程入门指南

2023-09-287.1k 阅读

Go 并发编程基础概念

Go 语言在设计之初就将并发编程作为其核心特性之一,这主要得益于 Go 语言中轻量级线程(goroutine)和通道(channel)的设计。

goroutine

goroutine 是 Go 语言中实现并发的核心概念,它类似于线程,但并不是传统意义上的操作系统线程。goroutine 是由 Go 运行时(runtime)管理的轻量级执行单元。与操作系统线程相比,创建和销毁 goroutine 的开销非常小,这使得我们可以轻松地创建数以万计的 goroutine。

以下是一个简单的示例,展示如何创建和启动一个 goroutine:

package main

import (
    "fmt"
    "time"
)

func hello() {
    fmt.Println("Hello from goroutine")
}

func main() {
    go hello()
    time.Sleep(time.Second)
    fmt.Println("Main function")
}

在上述代码中,我们通过 go 关键字启动了一个新的 goroutine 来执行 hello 函数。main 函数会继续执行,不会等待 hello 函数完成。为了让程序在 hello 函数执行完毕之前不退出,我们使用 time.Sleepmain 函数休眠一秒钟。

并发与并行

在理解 goroutine 时,很容易混淆并发(concurrency)和并行(parallelism)这两个概念。并发是指在同一时间段内处理多个任务,这些任务可以交替执行,但不一定是同时执行。而并行则是指在同一时刻同时执行多个任务,这需要多个处理器核心的支持。

Go 语言的 goroutine 实现的是并发,在单核心的 CPU 上,多个 goroutine 会通过 Go 运行时的调度器进行分时复用,轮流执行。在多核心的 CPU 上,Go 运行时可以将不同的 goroutine 分配到不同的核心上并行执行,充分利用多核的优势。

通道(channel)

通道是 Go 语言中用于在 goroutine 之间进行通信和同步的重要机制。它提供了一种类型安全的方式来传递数据,避免了传统共享内存并发编程中可能出现的竞态条件(race condition)。

创建通道

通道使用 make 函数来创建,其基本语法如下:

ch := make(chan Type)

这里 Type 是通道中传递的数据类型。例如,创建一个传递整数的通道:

intChan := make(chan int)

还可以创建带缓冲的通道,指定通道的容量:

bufferedChan := make(chan int, 10)

带缓冲的通道在缓冲区未满时,可以无阻塞地发送数据。

发送和接收数据

向通道发送数据使用 <- 操作符,例如:

ch <- value

从通道接收数据也使用 <- 操作符:

value := <-ch

以下是一个完整的示例,展示如何使用通道在两个 goroutine 之间传递数据:

package main

import (
    "fmt"
)

func sender(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiver(ch chan int) {
    for value := range ch {
        fmt.Println("Received:", value)
    }
}

func main() {
    ch := make(chan int)
    go sender(ch)
    go receiver(ch)
    // 防止 main 函数过早退出
    select {}
}

在这个示例中,sender 函数向通道 ch 发送 0 到 4 的整数,然后关闭通道。receiver 函数通过 for... range 循环从通道中接收数据,直到通道关闭。

通道的阻塞与同步

通道的发送和接收操作默认是阻塞的。这意味着当一个 goroutine 尝试向一个已满的通道发送数据时,它会被阻塞,直到有其他 goroutine 从通道中接收数据,腾出空间。同样,当一个 goroutine 尝试从一个空的通道接收数据时,它也会被阻塞,直到有其他 goroutine 向通道发送数据。

这种阻塞特性使得通道可以用于同步 goroutine。例如,我们可以使用一个通道来等待一组 goroutine 完成任务:

package main

import (
    "fmt"
)

func worker(id int, done chan bool) {
    fmt.Printf("Worker %d started\n", id)
    // 模拟工作
    for i := 0; i < 1000000; i++ {
        // 一些计算
    }
    fmt.Printf("Worker %d finished\n", id)
    done <- true
}

func main() {
    numWorkers := 5
    done := make(chan bool, numWorkers)

    for i := 0; i < numWorkers; i++ {
        go worker(i, done)
    }

    for i := 0; i < numWorkers; i++ {
        <-done
    }

    close(done)
    fmt.Println("All workers finished")
}

在这个例子中,每个 worker 函数在完成任务后向 done 通道发送一个 truemain 函数通过从 done 通道接收 numWorkers 次数据,来等待所有的 worker 完成任务。

同步原语

除了通道,Go 语言还提供了一些传统的同步原语,如互斥锁(Mutex)、读写锁(RWMutex)等,用于保护共享资源,防止竞态条件。

互斥锁(Mutex)

互斥锁用于保证在同一时间只有一个 goroutine 可以访问共享资源。在 Go 语言中,使用 sync.Mutex 类型来实现互斥锁。以下是一个简单的示例:

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    numRoutines := 1000

    for i := 0; i < numRoutines; i++ {
        wg.Add(1)
        go increment(&wg)
    }

    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在这个示例中,counter 是一个共享变量,多个 goroutine 尝试对其进行递增操作。通过 mu.Lock()mu.Unlock() 来保护 counter,确保在同一时间只有一个 goroutine 可以修改它,从而避免竞态条件。

读写锁(RWMutex)

读写锁用于区分读操作和写操作。允许多个 goroutine 同时进行读操作,但只允许一个 goroutine 进行写操作,并且在写操作进行时,不允许有读操作。在 Go 语言中,使用 sync.RWMutex 类型来实现读写锁。

以下是一个示例,展示如何使用读写锁:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data    = make(map[string]int)
    rwMutex sync.RWMutex
)

func read(key string, wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.RLock()
    value, exists := data[key]
    rwMutex.RUnlock()
    if exists {
        fmt.Printf("Read %s: %d\n", key, value)
    } else {
        fmt.Printf("Key %s not found\n", key)
    }
}

func write(key string, value int, wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.Lock()
    data[key] = value
    fmt.Printf("Write %s: %d\n", key, value)
    rwMutex.Unlock()
}

func main() {
    var wg sync.WaitGroup

    wg.Add(1)
    go write("key1", 100, &wg)

    time.Sleep(time.Millisecond * 100)

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go read("key1", &wg)
    }

    wg.Wait()
}

在这个示例中,read 函数使用 rwMutex.RLock()rwMutex.RUnlock() 进行读操作,允许多个 goroutine 同时读取。write 函数使用 rwMutex.Lock()rwMutex.Unlock() 进行写操作,保证在写操作时没有其他 goroutine 可以读或写。

上下文(Context)

在并发编程中,经常需要对 goroutine 的生命周期进行控制,例如在程序退出时,优雅地关闭所有正在运行的 goroutine。Go 语言的上下文(Context)包提供了一种机制来实现这一点。

基本概念

上下文是一个携带截止时间、取消信号和其他请求范围值的对象,用于在多个 goroutine 之间传递。上下文通常由父 goroutine 创建,然后传递给它启动的子 goroutine。

Go 语言提供了四种类型的上下文:

  1. context.Background():这是所有上下文的根,通常用于 main 函数、初始化和测试代码。
  2. context.TODO():用于在不确定使用哪种上下文时的占位符。
  3. context.WithCancel(parent Context):创建一个可取消的上下文,返回一个取消函数 CancelFunc,调用该函数可以取消上下文。
  4. context.WithDeadline(parent Context, deadline time.Time)context.WithTimeout(parent Context, timeout time.Duration):分别创建一个带有截止时间和超时时间的上下文。

使用示例

以下是一个使用 context.WithCancel 来取消 goroutine 的示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker received cancel signal, exiting")
            return
        default:
            fmt.Println("Worker is working")
            time.Sleep(time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go worker(ctx)

    time.Sleep(time.Second * 3)
    cancel()
    time.Sleep(time.Second)
    fmt.Println("Main function exiting")
}

在这个示例中,worker 函数通过 select 语句监听 ctx.Done() 通道。当 cancel 函数被调用时,ctx.Done() 通道会收到一个值,从而使 worker 函数退出循环并结束。

并发模式

在实际的并发编程中,有一些常见的并发模式可以帮助我们更有效地组织和管理 goroutine 与通道。

生产者 - 消费者模式

生产者 - 消费者模式是一种经典的并发模式,其中生产者 goroutine 生成数据并将其发送到通道,消费者 goroutine 从通道中接收数据并处理。

以下是一个简单的生产者 - 消费者模式示例:

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch chan int) {
    for value := range ch {
        fmt.Println("Consumed:", value)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    go consumer(ch)

    // 防止 main 函数过早退出
    select {}
}

在这个示例中,producer 函数作为生产者,向通道 ch 发送 0 到 9 的整数。consumer 函数作为消费者,从通道中接收数据并打印。

扇入(Fan - In)和扇出(Fan - Out)

扇出是指将一个输入源的数据分发给多个 goroutine 进行处理。扇入则是指将多个 goroutine 的输出合并到一个通道中。

以下是一个扇出和扇入的示例:

package main

import (
    "fmt"
)

func worker(id int, in chan int, out chan int) {
    for value := range in {
        out <- value * id
    }
    close(out)
}

func fanOut(in chan int, numWorkers int) []chan int {
    var outChannels []chan int
    for i := 1; i <= numWorkers; i++ {
        outCh := make(chan int)
        go worker(i, in, outCh)
        outChannels = append(outChannels, outCh)
    }
    return outChannels
}

func fanIn(inChannels []chan int, out chan int) {
    var wg sync.WaitGroup
    for _, ch := range inChannels {
        wg.Add(1)
        go func(c chan int) {
            defer wg.Done()
            for value := range c {
                out <- value
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
}

func main() {
    in := make(chan int)
    out := make(chan int)

    numWorkers := 3
    outChannels := fanOut(in, numWorkers)
    fanIn(outChannels, out)

    go func() {
        for i := 0; i < 5; i++ {
            in <- i
        }
        close(in)
    }()

    for value := range out {
        fmt.Println("Final result:", value)
    }
}

在这个示例中,fanOut 函数将输入通道 in 的数据分发给 numWorkersworker goroutine 进行处理,每个 worker 将处理结果发送到各自的输出通道。fanIn 函数将这些输出通道的数据合并到一个输出通道 out 中。

并发编程中的错误处理

在并发编程中,错误处理变得更加复杂,因为多个 goroutine 可能同时产生错误,并且需要正确地传递和处理这些错误。

使用通道传递错误

一种常见的方法是使用通道来传递错误。例如,在生产者 - 消费者模式中,如果生产者在生成数据时遇到错误,可以将错误通过通道传递给消费者。

package main

import (
    "fmt"
)

func producer(ch chan int, errCh chan error) {
    for i := 0; i < 10; i++ {
        if i == 5 {
            errCh <- fmt.Errorf("Error at i = 5")
            return
        }
        ch <- i
    }
    close(ch)
}

func consumer(ch chan int, errCh chan error) {
    for {
        select {
        case value, ok := <-ch:
            if!ok {
                return
            }
            fmt.Println("Consumed:", value)
        case err := <-errCh:
            fmt.Println("Error:", err)
            return
        }
    }
}

func main() {
    ch := make(chan int)
    errCh := make(chan error)

    go producer(ch, errCh)
    go consumer(ch, errCh)

    // 防止 main 函数过早退出
    select {}
}

在这个示例中,当 i 等于 5 时,producer 函数向 errCh 通道发送一个错误。consumer 函数通过 select 语句监听 ch 通道和 errCh 通道,当接收到错误时,打印错误信息并退出。

错误处理与上下文

上下文也可以与错误处理结合使用。例如,当一个 goroutine 遇到错误时,可以通过取消上下文来通知其他相关的 goroutine 停止工作。

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker received cancel signal, exiting")
            return nil
        default:
            fmt.Println("Worker is working")
            time.Sleep(time.Second)
            if someCondition {
                return fmt.Errorf("Error occurred in worker")
            }
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        err := worker(ctx)
        if err != nil {
            fmt.Println("Worker error:", err)
            cancel()
        }
    }()

    time.Sleep(time.Second * 5)
    fmt.Println("Main function exiting")
}

在这个示例中,如果 worker 函数遇到错误,它会打印错误信息并调用 cancel 函数取消上下文,从而通知其他相关的 goroutine 停止工作。

性能优化与调优

在并发编程中,性能优化是一个重要的方面。以下是一些常见的性能优化和调优的方法。

减少锁的竞争

锁的竞争会导致性能下降,因为多个 goroutine 等待获取锁会消耗时间。尽量减少锁的使用范围和持有时间,例如,只在必要的代码段加锁。

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    localCounter := 0
    for i := 0; i < 1000; i++ {
        localCounter++
    }
    mu.Lock()
    counter += localCounter
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    numRoutines := 1000

    for i := 0; i < numRoutines; i++ {
        wg.Add(1)
        go increment(&wg)
    }

    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在这个改进的示例中,每个 goroutine 先在本地进行计数,最后再通过锁将本地计数结果累加到共享的 counter 上,减少了锁的持有时间。

合理使用通道缓冲

通道的缓冲大小会影响性能。如果通道没有缓冲或者缓冲过小,可能会导致频繁的阻塞和唤醒操作。而缓冲过大则可能会占用过多的内存。根据实际情况,合理设置通道的缓冲大小。

例如,在生产者 - 消费者模式中,如果生产者生产数据的速度较快,而消费者处理数据的速度较慢,可以适当增大通道的缓冲大小,避免生产者频繁阻塞。

使用 pprof 进行性能分析

Go 语言提供了 pprof 工具来进行性能分析。可以通过 net/http/pprof 包来收集和分析程序的性能数据,例如 CPU 使用率、内存占用、goroutine 堆栈等。

以下是一个简单的示例,展示如何使用 pprof

package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
)

func main() {
    go func() {
        err := http.ListenAndServe(":6060", nil)
        if err != nil {
            fmt.Println("Error starting pprof server:", err)
        }
    }()

    // 模拟一些工作
    for {
        // 一些计算
    }
}

运行上述程序后,可以通过浏览器访问 http://localhost:6060/debug/pprof/ 来查看性能分析数据。例如,访问 http://localhost:6060/debug/pprof/profile 可以获取 CPU 性能分析数据,使用 go tool pprof 命令可以进一步分析这些数据。

通过合理地应用这些性能优化和调优方法,可以提高 Go 并发程序的性能和效率。在实际开发中,需要根据具体的应用场景和需求,灵活选择和组合这些方法。同时,不断地进行性能测试和分析,以确保程序在高并发情况下的稳定性和高效性。

以上就是 Go 并发编程的入门指南,涵盖了从基础概念到高级技巧和性能优化的各个方面。通过深入理解和实践这些知识,开发者可以编写出高效、稳定的并发程序。希望这份指南能够帮助你在 Go 并发编程的道路上迈出坚实的步伐。