MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go 语言通道的底层实现与并发编程模式

2021-03-225.0k 阅读

Go 语言通道的底层实现

通道的数据结构

在 Go 语言中,通道(channel)是一种用于在 goroutine 之间进行通信和同步的数据结构。其底层数据结构在源码 src/runtime/chan.go 中定义。

type hchan struct {
	qcount   uint           // 当前队列中剩余元素个数
	dataqsiz uint           // 环形队列的大小
	buf      unsafe.Pointer // 环形队列的指针
	elemsize uint16         // 每个元素的大小
	closed   uint32         // 标识通道是否关闭
	elemtype *_type         // 元素的类型
	sendx    uint           // 发送操作在环形队列中的索引
	recvx    uint           // 接收操作在环形队列中的索引
	recvq    waitq          // 等待接收数据的 goroutine 队列
	sendq    waitq          // 等待发送数据的 goroutine 队列

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

这里,qcount 表示当前通道中已经存储的元素数量,dataqsiz 是通道缓冲区(如果有)的大小。buf 是一个指向实际存储数据的环形缓冲区的指针。elemtype 定义了通道中元素的类型,elemsize 则表示每个元素的大小。sendxrecvx 分别是发送和接收操作在环形缓冲区中的索引。sendqrecvq 是两个等待队列,分别用于存储等待发送和接收数据的 goroutine。closed 用于标记通道是否已经关闭。

无缓冲通道的发送操作

当一个 goroutine 尝试向无缓冲通道发送数据时,会执行以下步骤:

  1. 获取通道的锁 lock
  2. 检查通道是否已经关闭。如果通道已关闭,会触发 panic,提示向已关闭的通道发送数据。
  3. 检查接收队列 recvq 是否为空。如果不为空,说明有 goroutine 正在等待接收数据。从接收队列中取出一个等待接收的 goroutine(sudog),将数据直接发送给这个 goroutine,然后释放锁并返回。
  4. 如果接收队列 recvq 为空,将当前 goroutine 放入发送队列 sendq,然后将当前 goroutine 挂起,等待被唤醒。

下面是简化后的代码示例来模拟这个过程:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go func() {
        // 模拟发送操作
        value := 42
        select {
        case <-ch:
            // 通道有接收者,直接发送数据
            ch <- value
        default:
            // 没有接收者,将自己放入发送队列并挂起
            fmt.Println("No receiver, will block")
            ch <- value
        }
    }()
    // 模拟接收操作
    go func() {
        data := <-ch
        fmt.Println("Received:", data)
    }()
    select {}
}

无缓冲通道的接收操作

当一个 goroutine 尝试从无缓冲通道接收数据时:

  1. 获取通道的锁 lock
  2. 检查通道是否已经关闭且当前通道中没有数据。如果是,返回零值并设置标志位表示通道已关闭。
  3. 检查发送队列 sendq 是否为空。如果不为空,说明有 goroutine 正在等待发送数据。从发送队列中取出一个等待发送的 goroutine(sudog),接收其发送的数据,然后唤醒这个 goroutine,释放锁并返回。
  4. 如果发送队列 sendq 为空,将当前 goroutine 放入接收队列 recvq,然后将当前 goroutine 挂起,等待被唤醒。

以下代码展示了接收操作的模拟:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go func() {
        // 模拟发送操作
        ch <- 42
    }()
    // 模拟接收操作
    select {
    case data := <-ch:
        fmt.Println("Received:", data)
    default:
        fmt.Println("No sender, will block")
        data := <-ch
        fmt.Println("Received:", data)
    }
}

有缓冲通道的发送操作

有缓冲通道在发送数据时与无缓冲通道有所不同。

  1. 获取通道的锁 lock
  2. 检查通道是否已经关闭。如果通道已关闭,会触发 panic,提示向已关闭的通道发送数据。
  3. 检查通道的缓冲区是否已满(qcount == dataqsiz)。如果缓冲区未满,将数据放入缓冲区(根据 sendx 索引),sendx 递增(如果 sendx 达到 dataqsiz,则重置为 0),qcount 加 1,释放锁并返回。
  4. 如果缓冲区已满,将当前 goroutine 放入发送队列 sendq,然后将当前 goroutine 挂起,等待被唤醒。

示例代码如下:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 2)
    go func() {
        // 模拟发送操作
        for i := 0; i < 3; i++ {
            select {
            case ch <- i:
                fmt.Printf("Sent %d to channel\n", i)
            default:
                fmt.Printf("Channel full, can't send %d\n", i)
            }
        }
    }()
    // 模拟接收操作
    go func() {
        for i := 0; i < 3; i++ {
            data := <-ch
            fmt.Printf("Received %d from channel\n", data)
        }
    }()
    select {}
}

有缓冲通道的接收操作

对于有缓冲通道的接收操作:

  1. 获取通道的锁 lock
  2. 检查通道是否已经关闭且当前通道中没有数据。如果是,返回零值并设置标志位表示通道已关闭。
  3. 检查通道的缓冲区是否为空(qcount == 0)。如果缓冲区不为空,从缓冲区中取出数据(根据 recvx 索引),recvx 递增(如果 recvx 达到 dataqsiz,则重置为 0),qcount 减 1,释放锁并返回。
  4. 如果缓冲区为空,将当前 goroutine 放入接收队列 recvq,然后将当前 goroutine 挂起,等待被唤醒。

示例代码如下:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 2)
    go func() {
        // 模拟发送操作
        ch <- 10
        ch <- 20
    }()
    // 模拟接收操作
    go func() {
        select {
        case data := <-ch:
            fmt.Printf("Received %d from channel\n", data)
        default:
            fmt.Println("Channel empty, can't receive")
        }
        data := <-ch
        fmt.Printf("Received %d from channel\n", data)
    }()
    select {}
}

Go 语言并发编程模式

生产者 - 消费者模式

生产者 - 消费者模式是一种常见的并发编程模式,在 Go 语言中通过通道可以很方便地实现。生产者 goroutine 生成数据并将其发送到通道,消费者 goroutine 从通道接收数据并进行处理。

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
        fmt.Printf("Produced %d\n", i)
    }
    close(ch)
}

func consumer(ch chan int) {
    for data := range ch {
        fmt.Printf("Consumed %d\n", data)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    go consumer(ch)
    select {}
}

在这个示例中,producer 函数生成数字并发送到通道 chconsumer 函数从通道 ch 接收数字并打印。producer 完成数据发送后关闭通道,consumer 通过 for... range 循环可以优雅地处理通道关闭的情况。

扇入(Fan - In)模式

扇入模式是指将多个输入源的数据合并到一个输出通道。假设有多个生产者,每个生产者将数据发送到自己的通道,然后通过扇入模式将这些通道的数据合并到一个通道。

package main

import (
    "fmt"
)

func producer1(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i * 2
        fmt.Printf("Producer1 produced %d\n", i*2)
    }
    close(ch)
}

func producer2(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i * 3
        fmt.Printf("Producer2 produced %d\n", i*3)
    }
    close(ch)
}

func fanIn(ch1, ch2 chan int, out chan int) {
    for {
        select {
        case data, ok := <-ch1:
            if!ok {
                ch1 = nil
                continue
            }
            out <- data
        case data, ok := <-ch2:
            if!ok {
                ch2 = nil
                continue
            }
            out <- data
        }
        if ch1 == nil && ch2 == nil {
            close(out)
            return
        }
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    out := make(chan int)
    go producer1(ch1)
    go producer2(ch2)
    go fanIn(ch1, ch2, out)
    for data := range out {
        fmt.Printf("Received from fan - in: %d\n", data)
    }
}

在这个示例中,producer1producer2 分别向 ch1ch2 发送数据,fanIn 函数将 ch1ch2 的数据合并到 out 通道。

扇出(Fan - Out)模式

扇出模式与扇入相反,它将一个输入源的数据分发到多个输出通道。假设有一个生产者,其数据需要被多个消费者处理。

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
        fmt.Printf("Produced %d\n", i)
    }
    close(ch)
}

func fanOut(in chan int, out1, out2 chan int) {
    for data := range in {
        select {
        case out1 <- data:
            fmt.Printf("Sent %d to out1\n", data)
        case out2 <- data:
            fmt.Printf("Sent %d to out2\n", data)
        }
    }
    close(out1)
    close(out2)
}

func consumer1(ch chan int) {
    for data := range ch {
        fmt.Printf("Consumer1 received %d\n", data)
    }
}

func consumer2(ch chan int) {
    for data := range ch {
        fmt.Printf("Consumer2 received %d\n", data)
    }
}

func main() {
    in := make(chan int)
    out1 := make(chan int)
    out2 := make(chan int)
    go producer(in)
    go fanOut(in, out1, out2)
    go consumer1(out1)
    go consumer2(out2)
    select {}
}

在这个示例中,producerin 通道发送数据,fanOut 函数将 in 通道的数据分发到 out1out2 通道,consumer1consumer2 分别从 out1out2 通道接收数据。

流水线(Pipeline)模式

流水线模式是将多个处理步骤连接成一个流水线,每个步骤作为一个 goroutine,前一个步骤的输出作为后一个步骤的输入。

package main

import (
    "fmt"
)

func step1(chIn chan int, chOut chan int) {
    for data := range chIn {
        result := data * 2
        chOut <- result
        fmt.Printf("Step1 processed %d to %d\n", data, result)
    }
    close(chOut)
}

func step2(chIn chan int, chOut chan int) {
    for data := range chIn {
        result := data + 10
        chOut <- result
        fmt.Printf("Step2 processed %d to %d\n", data, result)
    }
    close(chOut)
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    go step1(ch1, ch2)
    go func() {
        for data := range ch2 {
            fmt.Printf("Final result: %d\n", data)
        }
    }()
    select {}
}

在这个示例中,step1ch1 接收数据并进行处理,将结果发送到 ch2step2ch2 接收数据并进一步处理,最终输出结果。

多路复用(Multiplexing)模式

多路复用模式通过 select 语句实现。select 语句可以同时监听多个通道的操作(发送或接收),当其中一个通道操作准备好时,就执行相应的分支。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        time.Sleep(2 * time.Second)
        ch1 <- 42
    }()

    go func() {
        time.Sleep(1 * time.Second)
        ch2 <- 100
    }()

    select {
    case data := <-ch1:
        fmt.Printf("Received from ch1: %d\n", data)
    case data := <-ch2:
        fmt.Printf("Received from ch2: %d\n", data)
    case <-time.After(3 * time.Second):
        fmt.Println("Timeout")
    }
}

在这个示例中,select 语句同时监听 ch1ch2 通道的接收操作,并且设置了一个 3 秒的超时。如果 ch2 先准备好,就接收 ch2 的数据;如果 ch1 先准备好,就接收 ch1 的数据;如果 3 秒内两个通道都没有准备好,就执行超时分支。

通过深入理解 Go 语言通道的底层实现和这些并发编程模式,开发者可以更加高效地编写并发程序,充分利用多核处理器的性能,提高程序的执行效率和响应能力。无论是开发高性能的网络服务,还是处理大规模数据的并行计算,这些知识都将是非常宝贵的。