MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go 语言 Goroutine 的并发模式与任务分解技巧

2021-11-054.2k 阅读

理解 Go 语言中的 Goroutine

Goroutine 基础概念

在 Go 语言中,Goroutine 是一种轻量级的并发执行单元。与操作系统线程相比,Goroutine 的创建和销毁开销非常小。当一个函数被作为 Goroutine 启动时,Go 运行时会为其分配一个独立的执行栈,并在这个栈上执行该函数。与传统线程不同,Goroutine 运行在 Go 运行时的调度器管理的线程之上,调度器负责在多个 Goroutine 之间进行高效的调度。

以下是一个简单的启动 Goroutine 的示例代码:

package main

import (
    "fmt"
    "time"
)

func printHello() {
    fmt.Println("Hello from goroutine")
}

func main() {
    go printHello()
    time.Sleep(1 * time.Second)
    fmt.Println("Main function exiting")
}

在上述代码中,go printHello() 语句启动了一个新的 Goroutine 来执行 printHello 函数。主函数 main 会继续执行,同时新的 Goroutine 也在并行执行。time.Sleep 用于确保在主函数退出之前,Goroutine 有足够的时间执行并打印出信息。

Goroutine 的调度模型

Go 语言的调度模型基于 M:N 模型,即多个 Goroutine 映射到多个操作系统线程上。具体来说,Go 运行时维护了三种实体:G(Goroutine)、M(操作系统线程)和 P(处理器)。

  • G(Goroutine):代表一个并发执行的任务,包含了执行的函数和其执行栈等信息。
  • M(操作系统线程):操作系统提供的线程,负责实际的代码执行。每个 M 会绑定到一个 P 上运行。
  • P(处理器):处理器,用于调度 Goroutine 到 M 上执行。P 维护了一个本地的 Goroutine 队列,同时也可以从全局的 Goroutine 队列中获取任务。

这种调度模型使得 Go 运行时能够高效地管理大量的 Goroutine。当一个 Goroutine 进行阻塞操作(如 I/O 操作)时,其对应的 M 可以被调度器重新分配给其他的 Goroutine,从而避免线程资源的浪费。

Goroutine 的并发模式

生产者 - 消费者模式

生产者 - 消费者模式是一种常见的并发模式,在这种模式中,生产者 Goroutine 生成数据并将其发送到通道(Channel),消费者 Goroutine 从通道中读取数据并进行处理。

下面是一个简单的生产者 - 消费者模式示例:

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch chan int) {
    for num := range ch {
        fmt.Printf("Consumed: %d\n", num)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    go consumer(ch)

    // 防止主函数过早退出
    select {}
}

在这个示例中,producer 函数作为生产者,将 0 到 4 的整数发送到通道 ch 中,然后关闭通道。consumer 函数作为消费者,通过 for... range 循环从通道 ch 中读取数据并打印。主函数中启动了生产者和消费者 Goroutine,然后通过 select {} 语句阻塞,防止主函数过早退出。

发布 - 订阅模式

发布 - 订阅模式中,发布者 Goroutine 向通道发布消息,多个订阅者 Goroutine 可以从通道中接收消息。这可以通过广播通道来实现。

package main

import (
    "fmt"
)

func publisher(ch chan<- string) {
    messages := []string{"message1", "message2", "message3"}
    for _, msg := range messages {
        ch <- msg
    }
    close(ch)
}

func subscriber(ch <-chan string, id int) {
    for msg := range ch {
        fmt.Printf("Subscriber %d received: %s\n", id, msg)
    }
}

func main() {
    ch := make(chan string)
    go publisher(ch)

    for i := 1; i <= 3; i++ {
        go subscriber(ch, i)
    }

    // 防止主函数过早退出
    select {}
}

在上述代码中,publisher 函数作为发布者,将消息发送到通道 chsubscriber 函数作为订阅者,从通道 ch 中接收消息并打印。主函数中启动了发布者和三个订阅者 Goroutine,通过 select {} 语句防止主函数过早退出。

扇入(Fan - In)模式

扇入模式是指将多个输入通道的数据合并到一个输出通道。这在需要同时处理多个数据源的场景中非常有用。

package main

import (
    "fmt"
)

func generateData(ch chan int, id int) {
    for i := id * 10; i < (id + 1) * 10; i++ {
        ch <- i
    }
    close(ch)
}

func fanIn(input1, input2 chan int, output chan int) {
    for {
        select {
        case val, ok := <-input1:
            if!ok {
                input1 = nil
            } else {
                output <- val
            }
        case val, ok := <-input2:
            if!ok {
                input2 = nil
            } else {
                output <- val
            }
        }
        if input1 == nil && input2 == nil {
            close(output)
            return
        }
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    output := make(chan int)

    go generateData(ch1, 1)
    go generateData(ch2, 2)
    go fanIn(ch1, ch2, output)

    for val := range output {
        fmt.Printf("Received: %d\n", val)
    }
}

在这个示例中,generateData 函数分别向 ch1ch2 通道生成数据。fanIn 函数将 ch1ch2 通道的数据合并到 output 通道。主函数中启动了数据生成和扇入的 Goroutine,并从 output 通道中读取数据并打印。

扇出(Fan - Out)模式

扇出模式与扇入相反,它将一个输入通道的数据分发到多个输出通道,由不同的 Goroutine 进行处理。

package main

import (
    "fmt"
)

func distributor(input chan int, output1, output2 chan int) {
    for val := range input {
        select {
        case output1 <- val:
        case output2 <- val:
        }
    }
    close(output1)
    close(output2)
}

func processor(ch chan int, id int) {
    for val := range ch {
        fmt.Printf("Processor %d processed: %d\n", id, val)
    }
}

func main() {
    input := make(chan int)
    output1 := make(chan int)
    output2 := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            input <- i
        }
        close(input)
    }()

    go distributor(input, output1, output2)
    go processor(output1, 1)
    go processor(output2, 2)

    // 防止主函数过早退出
    select {}
}

在上述代码中,distributor 函数从 input 通道读取数据,并将数据分发到 output1output2 通道。processor 函数从各自的输出通道读取数据并处理。主函数中启动了数据生成、分发和处理的 Goroutine,并通过 select {} 语句防止主函数过早退出。

任务分解技巧

按功能分解任务

当处理一个复杂的任务时,可以根据功能将其分解为多个子任务,每个子任务由一个 Goroutine 来执行。例如,在一个网络爬虫程序中,可以将获取网页、解析网页和存储数据分别作为不同的子任务。

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
)

func fetchURL(url string, ch chan string) {
    resp, err := http.Get(url)
    if err!= nil {
        ch <- ""
        return
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err!= nil {
        ch <- ""
        return
    }
    ch <- string(body)
}

func parseHTML(html string) string {
    // 简单示例,实际应使用更复杂的解析逻辑
    return fmt.Sprintf("Parsed: %s", html[:10])
}

func storeData(data string) {
    fmt.Printf("Stored: %s\n", data)
}

func main() {
    url := "https://example.com"
    ch := make(chan string)

    go fetchURL(url, ch)
    html := <-ch
    if html!= "" {
        parsed := parseHTML(html)
        go storeData(parsed)
    }
}

在这个示例中,fetchURL 函数负责获取网页内容,parseHTML 函数负责解析网页,storeData 函数负责存储数据。主函数中启动了获取网页的 Goroutine,获取到网页内容后进行解析,然后启动存储数据的 Goroutine。

按数据块分解任务

如果任务需要处理大量的数据,可以将数据按块进行划分,每个块由一个 Goroutine 来处理。例如,在处理一个大文件时,可以按行或者按固定大小的数据块进行划分。

package main

import (
    "bufio"
    "fmt"
    "os"
)

func processLines(filePath string, start, end int, ch chan string) {
    file, err := os.Open(filePath)
    if err!= nil {
        ch <- ""
        return
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    lineNum := 0
    result := ""
    for scanner.Scan() {
        lineNum++
        if lineNum >= start && lineNum <= end {
            result += scanner.Text() + "\n"
        }
    }
    ch <- result
}

func main() {
    filePath := "large_file.txt"
    numGoroutines := 3
    file, err := os.Open(filePath)
    if err!= nil {
        fmt.Println("Error opening file:", err)
        return
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    lineCount := 0
    for scanner.Scan() {
        lineCount++
    }

    chunkSize := (lineCount + numGoroutines - 1) / numGoroutines
    chs := make([]chan string, numGoroutines)
    for i := 0; i < numGoroutines; i++ {
        chs[i] = make(chan string)
        start := i * chunkSize + 1
        end := (i + 1) * chunkSize
        if i == numGoroutines - 1 {
            end = lineCount
        }
        go processLines(filePath, start, end, chs[i])
    }

    for i := 0; i < numGoroutines; i++ {
        result := <-chs[i]
        fmt.Printf("Result from goroutine %d: %s\n", i, result)
    }
}

在上述代码中,processLines 函数负责处理文件中指定行范围的数据块。主函数中根据文件行数和 Goroutine 数量计算每个数据块的范围,然后启动多个 Goroutine 分别处理不同的数据块,并从各个通道中获取处理结果。

层次化任务分解

对于非常复杂的任务,可以采用层次化的任务分解方式。将任务首先分解为几个主要的子任务,然后每个子任务再进一步分解为更小的子任务,以此类推。

例如,在一个大型的数据分析项目中,首先可以将任务分解为数据采集、数据清洗和数据分析三个主要子任务。数据采集子任务又可以进一步分解为从不同数据源采集数据的子任务,数据清洗子任务可以分解为去除噪声、处理缺失值等子任务,数据分析子任务可以分解为不同指标的计算和模型训练等子任务。

package main

import (
    "fmt"
)

// 模拟数据采集
func dataCollection(source string, ch chan string) {
    data := fmt.Sprintf("Data from %s", source)
    ch <- data
}

// 模拟数据清洗
func dataCleaning(data string, ch chan string) {
    cleanedData := fmt.Sprintf("Cleaned: %s", data)
    ch <- cleanedData
}

// 模拟数据分析
func dataAnalysis(data string) {
    fmt.Printf("Analyzed: %s\n", data)
}

func main() {
    sources := []string{"source1", "source2", "source3"}
    dataChs := make([]chan string, len(sources))
    for i := range sources {
        dataChs[i] = make(chan string)
        go dataCollection(sources[i], dataChs[i])
    }

    cleanedChs := make([]chan string, len(sources))
    for i := range sources {
        cleanedChs[i] = make(chan string)
        go dataCleaning(<-dataChs[i], cleanedChs[i])
    }

    for i := range sources {
        go dataAnalysis(<-cleanedChs[i])
    }

    // 防止主函数过早退出
    select {}
}

在这个示例中,首先启动多个数据采集的 Goroutine,然后针对每个采集到的数据启动数据清洗的 Goroutine,最后对清洗后的数据启动数据分析的 Goroutine。通过这种层次化的任务分解,使得复杂的数据分析任务能够有条不紊地并行执行。

处理 Goroutine 间的同步与通信

使用通道进行同步

通道不仅可以用于数据传递,还可以用于 Goroutine 之间的同步。例如,当一个 Goroutine 完成某项任务后,通过通道向其他 Goroutine 发送信号。

package main

import (
    "fmt"
)

func task(ch chan struct{}) {
    fmt.Println("Task started")
    // 模拟任务执行
    // 这里可以是复杂的计算或者 I/O 操作
    fmt.Println("Task completed")
    ch <- struct{}{}
}

func main() {
    ch := make(chan struct{})
    go task(ch)
    <-ch
    fmt.Println("Main function knows task is completed")
}

在上述代码中,task 函数在完成任务后,向通道 ch 发送一个空结构体,主函数通过从通道 ch 接收数据来得知任务已经完成。

使用 WaitGroup 进行同步

WaitGroup 是 Go 标准库提供的一种同步机制,用于等待一组 Goroutine 完成。

package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Worker started")
    // 模拟任务执行
    fmt.Println("Worker completed")
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 3
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    fmt.Println("All workers completed")
}

在这个示例中,WaitGroupAdd 方法用于增加等待的 Goroutine 数量,Done 方法用于标记一个 Goroutine 完成,Wait 方法用于阻塞当前 Goroutine,直到所有标记为等待的 Goroutine 都调用了 Done 方法。

使用互斥锁(Mutex)保护共享资源

当多个 Goroutine 需要访问共享资源时,为了避免数据竞争,需要使用互斥锁(Mutex)来保护共享资源。

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mutex   sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mutex.Lock()
    counter++
    mutex.Unlock()
}

func main() {
    var wg sync.WaitGroup
    numRoutines := 1000
    for i := 0; i < numRoutines; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter)
}

在上述代码中,mutex 用于保护 counter 这个共享资源。Lock 方法用于锁定互斥锁,确保在同一时间只有一个 Goroutine 可以访问共享资源,Unlock 方法用于解锁互斥锁,允许其他 Goroutine 访问。

错误处理与 Goroutine

在 Goroutine 中处理错误

当在 Goroutine 中发生错误时,需要一种机制将错误传递出来。通道可以用于传递错误信息。

package main

import (
    "fmt"
)

func divide(a, b int, resultCh chan int, errCh chan error) {
    if b == 0 {
        errCh <- fmt.Errorf("division by zero")
        return
    }
    resultCh <- a / b
}

func main() {
    resultCh := make(chan int)
    errCh := make(chan error)

    go divide(10, 2, resultCh, errCh)

    select {
    case result := <-resultCh:
        fmt.Printf("Result: %d\n", result)
    case err := <-errCh:
        fmt.Printf("Error: %v\n", err)
    }
    close(resultCh)
    close(errCh)
}

在这个示例中,divide 函数在发生除零错误时,通过 errCh 通道传递错误信息,否则通过 resultCh 通道传递计算结果。主函数通过 select 语句来处理结果或错误。

处理多个 Goroutine 中的错误

当有多个 Goroutine 执行任务时,可能需要收集所有 Goroutine 中的错误。可以使用一个专门的 Goroutine 来收集错误。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, errCh chan error) {
    if id == 2 {
        errCh <- fmt.Errorf("worker %d failed", id)
    } else {
        errCh <- nil
    }
}

func main() {
    numWorkers := 3
    errCh := make(chan error, numWorkers)
    var wg sync.WaitGroup

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, errCh)
        }(i)
    }

    go func() {
        wg.Wait()
        close(errCh)
    }()

    allErrs := []error{}
    for err := range errCh {
        if err!= nil {
            allErrs = append(allErrs, err)
        }
    }

    if len(allErrs) > 0 {
        fmt.Println("Errors occurred:")
        for _, err := range allErrs {
            fmt.Println(err)
        }
    } else {
        fmt.Println("All workers completed successfully")
    }
}

在上述代码中,每个 worker Goroutine 将错误发送到 errCh 通道。一个专门的 Goroutine 在所有 worker Goroutine 完成后关闭 errCh 通道。主函数通过遍历 errCh 通道来收集所有的错误,并进行相应的处理。

优化 Goroutine 的使用

合理设置 Goroutine 数量

过多的 Goroutine 可能会导致调度开销增大,过少的 Goroutine 则无法充分利用系统资源。可以根据任务的性质和系统的 CPU 核心数来合理设置 Goroutine 的数量。

例如,对于 CPU 密集型任务,可以将 Goroutine 的数量设置为与 CPU 核心数相近,以避免过多的上下文切换开销。对于 I/O 密集型任务,可以适当增加 Goroutine 的数量,因为在 I/O 阻塞期间,其他 Goroutine 可以继续执行。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func cpuIntensiveTask(wg *sync.WaitGroup) {
    defer wg.Done()
    // 模拟 CPU 密集型计算
    for i := 0; i < 1000000000; i++ {
        _ = i * i
    }
}

func main() {
    numCPU := runtime.NumCPU()
    var wg sync.WaitGroup
    for i := 0; i < numCPU; i++ {
        wg.Add(1)
        go cpuIntensiveTask(&wg)
    }
    wg.Wait()
    fmt.Println("All CPU - intensive tasks completed")
}

在这个示例中,根据 CPU 核心数启动相应数量的 CPU 密集型任务 Goroutine。

避免 Goroutine 泄漏

Goroutine 泄漏是指当一个 Goroutine 无限期地运行且无法被终止时,会占用系统资源。常见的导致 Goroutine 泄漏的情况包括在通道操作中没有正确处理关闭通道,或者在函数中没有正确返回。

package main

import (
    "fmt"
)

func badGoroutine() {
    ch := make(chan int)
    go func() {
        for {
            select {
            case val := <-ch:
                fmt.Printf("Received: %d\n", val)
            }
        }
    }()
    // 这里没有关闭通道,导致 goroutine 泄漏
}

func goodGoroutine() {
    ch := make(chan int)
    go func() {
        for val := range ch {
            fmt.Printf("Received: %d\n", val)
        }
    }()
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func main() {
    // badGoroutine()
    goodGoroutine()
}

badGoroutine 函数中,由于没有关闭通道,内部的 Goroutine 会一直阻塞在 select 语句中等待数据,从而导致 Goroutine 泄漏。而在 goodGoroutine 函数中,正确地关闭了通道,避免了泄漏。

高效使用通道

通道的缓冲区大小设置会影响 Goroutine 的执行效率。无缓冲通道会在发送和接收操作时阻塞,直到对应的操作发生,这有助于保证数据的同步和顺序。有缓冲通道则可以在缓冲区未满时非阻塞地发送数据。

package main

import (
    "fmt"
)

func useUnbufferedChannel() {
    ch := make(chan int)
    go func() {
        fmt.Println("Sending 10")
        ch <- 10
    }()
    val := <-ch
    fmt.Printf("Received: %d\n", val)
}

func useBufferedChannel() {
    ch := make(chan int, 1)
    fmt.Println("Sending 20")
    ch <- 20
    val := <-ch
    fmt.Printf("Received: %d\n", val)
}

func main() {
    useUnbufferedChannel()
    useBufferedChannel()
}

useUnbufferedChannel 函数中,发送数据的 Goroutine 会阻塞,直到有其他 Goroutine 从通道接收数据。而在 useBufferedChannel 函数中,由于通道有缓冲区,发送操作可以非阻塞地执行。根据实际需求合理设置通道的缓冲区大小,可以提高程序的性能。

通过深入理解 Goroutine 的并发模式和任务分解技巧,并合理处理同步、错误以及优化使用,开发者能够充分发挥 Go 语言并发编程的强大功能,构建高效、健壮的应用程序。无论是小型的工具程序还是大型的分布式系统,这些技术都能为程序的设计和实现提供有力的支持。