MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go语言channel的使用与底层实现原理

2021-06-172.4k 阅读

Go语言channel基础使用

Go语言的并发模型是基于CSP(Communicating Sequential Processes)模型实现的,而channel则是这个模型的具体体现,它用于在多个goroutine之间进行通信和同步。

创建channel

创建一个channel非常简单,使用make关键字。例如,创建一个用于传递整数的channel:

package main

import "fmt"

func main() {
    ch := make(chan int)
    fmt.Printf("Type of ch: %T\n", ch)
}

上述代码中,make(chan int)创建了一个只能传递整数类型数据的channel。chan关键字用于声明channel类型,其基本语法为chan Type,其中Type是channel中传递的数据类型。

如果要创建一个带缓冲的channel,可以在make函数中传入第二个参数,该参数表示缓冲区的大小。例如:

package main

import "fmt"

func main() {
    ch := make(chan int, 5)
    fmt.Printf("Buffer size of ch: %d\n", cap(ch))
}

这里创建了一个缓冲区大小为5的channel,cap函数用于获取channel的缓冲区容量。

发送和接收数据

向channel发送数据使用<-操作符,从channel接收数据同样使用<-操作符。

发送数据的示例:

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        ch <- 42
        close(ch)
    }()
    data := <-ch
    fmt.Println("Received data:", data)
}

在上述代码中,首先创建了一个无缓冲的channel ch。然后启动一个匿名goroutine,在这个goroutine中向ch发送数据42,并在发送完成后关闭channel。主goroutine从ch中接收数据并打印。

接收数据的另一种常见方式是使用for - range循环,这种方式会在channel关闭时自动结束循环。例如:

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
        }
        close(ch)
    }()
    for data := range ch {
        fmt.Println("Received data:", data)
    }
}

在这个例子中,匿名goroutine向ch发送0到4的数据,主goroutine通过for - range循环从ch中接收数据,直到ch关闭。

单向channel

有时候,我们希望限制channel只能用于发送或接收数据,这就需要用到单向channel。

声明一个只用于发送数据的单向channel:

var sendOnly chan<- int

声明一个只用于接收数据的单向channel:

var receiveOnly <-chan int

单向channel通常用于函数参数,以明确函数对channel的使用方式。例如:

package main

import "fmt"

func sendData(ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiveData(ch <-chan int) {
    for data := range ch {
        fmt.Println("Received data:", data)
    }
}

func main() {
    ch := make(chan int)
    go sendData(ch)
    receiveData(ch)
}

在上述代码中,sendData函数的参数ch是一个只用于发送数据的单向channel,receiveData函数的参数ch是一个只用于接收数据的单向channel。这样可以在函数定义时就明确channel的使用方向,增加代码的可读性和安全性。

Go语言channel底层实现原理

要深入理解channel的底层实现,我们需要从它的数据结构开始分析。

channel的数据结构

在Go语言的运行时源码(src/runtime/chan.go)中,hchan结构体定义了channel的内部表示:

type hchan struct {
    qcount   uint           // 当前队列中剩余元素个数
    dataqsiz uint           // 环形队列的大小
    buf      unsafe.Pointer // 指向环形队列的指针
    elemsize uint16
    closed   uint32
    elemtype *_type // 元素类型
    sendx    uint   // 发送操作的索引
    recvx    uint   // 接收操作的索引
    recvq    waitq  // 接收等待队列
    sendq    waitq  // 发送等待队列

    // 保护以上字段的互斥锁
    lock mutex
}
  • qcount:表示当前channel缓冲区中已有的数据元素个数。
  • dataqsiz:表示channel缓冲区的大小,即可以容纳的数据元素的最大数量。
  • buf:是一个指向缓冲区的指针,缓冲区是一个环形队列,用于存储数据。
  • elemsize:表示每个数据元素的大小,以字节为单位。
  • closed:一个标志位,用于表示channel是否已经关闭。
  • elemtype:指向数据元素类型的元信息,包含类型大小、对齐方式等信息。
  • sendx:表示下一次发送操作应该写入数据的位置在缓冲区中的索引。
  • recvx:表示下一次接收操作应该读取数据的位置在缓冲区中的索引。
  • recvq:是一个等待队列,存储等待从该channel接收数据的goroutine。
  • sendq:也是一个等待队列,存储等待向该channel发送数据的goroutine。
  • lock:是一个互斥锁,用于保护hchan结构体中各个字段的并发访问。

发送操作的底层实现

当一个goroutine执行向channel发送数据的操作(ch <- value)时,其底层实现步骤如下:

  1. 获取锁:首先,goroutine会获取hchan结构体的锁,以确保对缓冲区和等待队列的操作是线程安全的。
  2. 检查channel是否关闭:如果channel已经关闭,并且缓冲区为空,会抛出panic: send on closed channel异常。
  3. 检查缓冲区是否有空间
    • 如果缓冲区未满(qcount < dataqsiz),会将数据直接写入缓冲区中sendx指向的位置,然后sendx加1(如果sendx达到dataqsiz,则重置为0,以实现环形队列),qcount加1,最后释放锁。
    • 如果缓冲区已满,会将当前goroutine放入sendq等待队列,然后释放锁并阻塞当前goroutine,直到有其他goroutine从channel接收数据,将其唤醒。
  4. 唤醒等待接收的goroutine:如果recvq等待队列不为空(即有goroutine在等待接收数据),会从recvq中取出一个等待的goroutine,将数据直接发送给它,而不经过缓冲区。然后唤醒该goroutine,最后释放锁。

下面是简化的发送操作伪代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool) {
    lock(&c.lock)
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        panic("send on closed channel")
    }
    if c.qcount < c.dataqsiz {
        // 缓冲区有空间,写入缓冲区
        qp := (*[1 << 16]byte)(c.buf) + c.sendx*c.elemsize
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return
    } else if!block {
        // 缓冲区已满且非阻塞发送
        unlock(&c.lock)
        return false
    }
    // 缓冲区已满,将当前goroutine加入sendq等待队列
    gp := getg()
    c.sendq.enqueue(gp)
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 2)
    // 被唤醒后继续执行
    if c.closed != 0 {
        panic("send on closed channel")
    }
    return true
}

接收操作的底层实现

当一个goroutine执行从channel接收数据的操作(data := <-ch)时,其底层实现步骤如下:

  1. 获取锁:同样,首先获取hchan结构体的锁。
  2. 检查channel是否关闭:如果channel已经关闭且缓冲区为空,会返回零值(数据类型的默认值)。
  3. 检查缓冲区是否有数据
    • 如果缓冲区不为空(qcount > 0),会从缓冲区中recvx指向的位置读取数据,然后recvx加1(如果recvx达到dataqsiz,则重置为0),qcount减1,最后释放锁并返回数据。
    • 如果缓冲区为空,会将当前goroutine放入recvq等待队列,然后释放锁并阻塞当前goroutine,直到有其他goroutine向channel发送数据,将其唤醒。
  4. 唤醒等待发送的goroutine:如果sendq等待队列不为空(即有goroutine在等待发送数据),会从sendq中取出一个等待的goroutine,直接接收它发送的数据,而不经过缓冲区。然后唤醒该goroutine,最后释放锁。

下面是简化的接收操作伪代码:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    if c.closed != 0 && c.qcount == 0 {
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        unlock(&c.lock)
        return true, false
    }
    if c.qcount > 0 {
        // 缓冲区有数据,从缓冲区读取
        qp := (*[1 << 16]byte)(c.buf) + c.recvx*c.elemsize
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    } else if!block {
        // 缓冲区为空且非阻塞接收
        unlock(&c.lock)
        return false, false
    }
    // 缓冲区为空,将当前goroutine加入recvq等待队列
    gp := getg()
    c.recvq.enqueue(gp)
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 2)
    // 被唤醒后继续执行
    if c.closed != 0 && c.qcount == 0 {
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    // 从发送者获取数据
    qp := (*[1 << 16]byte)(c.buf) + c.recvx*c.elemsize
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}

关闭channel的底层实现

关闭channel(close(ch))的操作在底层会执行以下步骤:

  1. 获取锁:首先获取hchan结构体的锁。
  2. 检查channel是否已经关闭:如果channel已经关闭,直接返回。
  3. 设置关闭标志:将closed字段设置为1,表示channel已关闭。
  4. 唤醒等待的goroutine
    • 遍历sendq等待队列,将所有等待发送的goroutine唤醒,这些goroutine会收到send on closed channelpanic
    • 遍历recvq等待队列,将所有等待接收的goroutine唤醒,这些goroutine会收到零值(数据类型的默认值)。
  5. 释放锁:最后释放锁。

下面是简化的关闭channel操作伪代码:

func closechan(c *hchan) {
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic("close of closed channel")
    }
    c.closed = 1
    for {
        gp := c.sendq.dequeue()
        if gp == nil {
            break
        }
        if!gp.m.curg {
            throw("chan: sendq g is not executing")
        }
        gp.schedlink = 0
        goready(gp, 3)
    }
    for {
        gp := c.recvq.dequeue()
        if gp == nil {
            break
        }
        if!gp.m.curg {
            throw("chan: recvq g is not executing")
        }
        gp.schedlink = 0
        goready(gp, 3)
    }
    unlock(&c.lock)
}

通过对channel底层数据结构和操作原理的深入分析,我们可以更好地理解Go语言并发编程中这一核心机制,从而编写出更加高效、健壮的并发程序。在实际应用中,合理地使用channel进行goroutine之间的通信和同步,能够避免很多传统并发编程中的难题,如竞态条件和死锁等。同时,深入了解底层原理也有助于我们在性能调优时做出更明智的决策。例如,如果我们知道channel的发送和接收操作在缓冲区满或空时的阻塞机制,就可以根据实际需求来调整缓冲区大小,以提高程序的并发性能。另外,在处理大量数据的场景下,对channel等待队列的理解可以帮助我们优化资源的使用,避免不必要的goroutine阻塞和唤醒开销。总之,掌握channel的底层实现原理是成为一名优秀Go语言开发者的关键一步。