MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go信号量实现的详细过程

2023-10-111.4k 阅读

什么是信号量

在并发编程中,信号量(Semaphore)是一个整型变量,它通过计数器来控制对共享资源的访问。信号量的值表示当前可用的共享资源数量。当一个线程想要访问共享资源时,它会尝试获取信号量(将计数器减1)。如果计数器的值大于0,说明有可用资源,线程可以获取信号量并继续执行;如果计数器的值为0,说明资源已被占用,线程需要等待,直到有其他线程释放信号量(将计数器加1)。

信号量主要有两个操作:P操作(也叫wait操作)和Q操作(也叫signal操作)。P操作会尝试获取信号量,即减少信号量的值。如果信号量的值为0,P操作会阻塞调用线程,直到信号量的值变为大于0。Q操作则是释放信号量,增加信号量的值,同时唤醒一个等待该信号量的线程(如果有线程在等待)。

信号量常用于解决以下问题:

  1. 资源限制:控制对有限资源的并发访问。例如,数据库连接池中的连接数量是有限的,通过信号量可以确保同时使用连接的线程数量不超过连接池的大小。
  2. 同步协作:在多个线程之间进行同步。例如,生产者 - 消费者模型中,生产者线程在向缓冲区添加数据后,可以通过信号量通知消费者线程有新数据可用。

Go 语言中的并发编程基础

Goroutine

Goroutine 是 Go 语言中实现并发的核心机制。它类似于线程,但更轻量级。创建一个 Goroutine 非常简单,只需在函数调用前加上 go 关键字。例如:

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

在上述代码中,go say("world") 启动了一个新的 Goroutine 来执行 say("world") 函数,而 say("hello") 则在主 Goroutine 中执行。两个函数并发执行,交替输出 "hello" 和 "world"。

Channel

Channel 是 Go 语言中用于在 Goroutine 之间进行通信和同步的机制。它可以被看作是一个管道,数据可以从一端发送,从另一端接收。创建一个 Channel 可以使用内置的 make 函数,例如:

ch := make(chan int)

发送数据到 Channel 使用 <- 操作符:

ch <- 10

从 Channel 接收数据也使用 <- 操作符:

value := <-ch

Channel 可以是带缓冲的或不带缓冲的。不带缓冲的 Channel 要求发送和接收操作必须同时准备好,否则会阻塞。带缓冲的 Channel 允许在缓冲区未满时发送数据,或者在缓冲区不为空时接收数据。例如:

// 创建一个带缓冲的 Channel,缓冲区大小为 3
ch := make(chan int, 3)

Go 信号量的实现思路

在 Go 语言中,虽然没有内置的信号量类型,但可以通过 Channel 和其他同步原语来实现信号量。基本思路是利用 Channel 的阻塞特性来模拟信号量的获取和释放操作。

使用 Channel 模拟信号量

一个简单的信号量实现可以使用一个带缓冲的 Channel,缓冲区的大小就是信号量的初始值。当一个 Goroutine 想要获取信号量时,它尝试从 Channel 接收数据。如果 Channel 中有数据(即信号量可用),接收操作立即返回,相当于获取了信号量;如果 Channel 为空,接收操作会阻塞,直到有其他 Goroutine 向 Channel 发送数据(即释放信号量)。

释放信号量则是向 Channel 发送数据。由于 Channel 是带缓冲的,发送操作在缓冲区未满时不会阻塞。

下面是一个简单的信号量实现示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

// Semaphore 定义信号量类型
type Semaphore chan struct{}

// NewSemaphore 创建一个新的信号量
func NewSemaphore(count int) Semaphore {
    sem := make(Semaphore, count)
    for i := 0; i < count; i++ {
        sem <- struct{}{}
    }
    return sem
}

// Acquire 获取信号量
func (s Semaphore) Acquire() {
    <-s
}

// Release 释放信号量
func (s Semaphore) Release() {
    s <- struct{}{}
}

在上述代码中:

  1. Semaphore 被定义为一个 struct{} 类型的 Channel。struct{} 是一个空结构体,占用内存空间最小,适合用于这种只需要计数而不需要携带数据的场景。
  2. NewSemaphore 函数用于创建一个新的信号量,它初始化一个带缓冲的 Channel,并向其中填充 count 个空结构体,这些填充的元素就代表了可用的信号量。
  3. Acquire 方法通过从 Channel 接收数据来获取信号量。如果 Channel 为空,接收操作会阻塞,直到有信号量被释放。
  4. Release 方法通过向 Channel 发送数据来释放信号量。由于 Channel 是带缓冲的,只要缓冲区未满,发送操作不会阻塞。

使用信号量控制并发访问

示例场景:限制并发请求数量

假设我们有一个服务,它只能处理有限数量的并发请求。我们可以使用信号量来控制同时访问该服务的 Goroutine 数量。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Semaphore 定义信号量类型
type Semaphore chan struct{}

// NewSemaphore 创建一个新的信号量
func NewSemaphore(count int) Semaphore {
    sem := make(Semaphore, count)
    for i := 0; i < count; i++ {
        sem <- struct{}{}
    }
    return sem
}

// Acquire 获取信号量
func (s Semaphore) Acquire() {
    <-s
}

// Release 释放信号量
func (s Semaphore) Release() {
    s <- struct{}{}
}

func worker(sem Semaphore, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    sem.Acquire()
    defer sem.Release()

    fmt.Printf("Worker %d started\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    maxConcurrent := 3
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(sem, i, &wg)
    }

    wg.Wait()
}

在上述代码中:

  1. worker 函数模拟一个工作任务,它首先通过 sem.Acquire() 获取信号量,完成任务后通过 sem.Release() 释放信号量。
  2. main 函数中,我们创建了一个初始值为 3 的信号量,表示最多允许 3 个并发请求。然后启动 5 个 Goroutine 来模拟工作任务。
  3. sync.WaitGroup 用于等待所有 Goroutine 完成任务。

运行上述代码,你会看到每次最多有 3worker 同时执行,当有 worker 完成任务并释放信号量后,其他等待的 worker 才能获取信号量并开始执行。

信号量实现的优化与拓展

带超时的信号量获取

在实际应用中,有时我们不希望一个 Goroutine 无限期地等待获取信号量,而是希望在一定时间后超时。可以通过结合 select 语句和 time.After 函数来实现带超时的信号量获取。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Semaphore 定义信号量类型
type Semaphore chan struct{}

// NewSemaphore 创建一个新的信号量
func NewSemaphore(count int) Semaphore {
    sem := make(Semaphore, count)
    for i := 0; i < count; i++ {
        sem <- struct{}{}
    }
    return sem
}

// AcquireWithTimeout 获取信号量并设置超时
func (s Semaphore) AcquireWithTimeout(timeout time.Duration) bool {
    select {
    case <-s:
        return true
    case <-time.After(timeout):
        return false
    }
}

// Release 释放信号量
func (s Semaphore) Release() {
    s <- struct{}{}
}

func worker(sem Semaphore, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    if ok := sem.AcquireWithTimeout(1 * time.Second);!ok {
        fmt.Printf("Worker %d timed out\n", id)
        return
    }
    defer sem.Release()

    fmt.Printf("Worker %d started\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    maxConcurrent := 3
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(sem, i, &wg)
    }

    wg.Wait()
}

在上述代码中:

  1. AcquireWithTimeout 方法使用 select 语句监听两个 Channel:信号量 Channel stime.After(timeout) 返回的 Channel。
  2. 如果在 timeout 时间内从信号量 Channel s 接收到数据,说明成功获取信号量,返回 true;如果在 timeout 时间内从 time.After(timeout) Channel 接收到数据,说明获取信号量超时,返回 false

可重入信号量

可重入信号量允许同一个 Goroutine 多次获取信号量而不会造成死锁。在标准的信号量实现中,一个 Goroutine 多次获取信号量会导致阻塞,因为没有对应的释放操作来增加信号量的值。

实现可重入信号量需要记录当前获取信号量的 Goroutine 以及获取的次数。

package main

import (
    "fmt"
    "sync"
    "time"
    "runtime"
)

// ReentrantSemaphore 定义可重入信号量类型
type ReentrantSemaphore struct {
    sem    chan struct{}
    count  int
    owner  uint64
    acquireCount int
}

// NewReentrantSemaphore 创建一个新的可重入信号量
func NewReentrantSemaphore(count int) *ReentrantSemaphore {
    sem := &ReentrantSemaphore{
        sem:    make(chan struct{}, count),
        count:  count,
    }
    for i := 0; i < count; i++ {
        sem.sem <- struct{}{}
    }
    return sem
}

// Acquire 获取可重入信号量
func (rs *ReentrantSemaphore) Acquire() {
    goID := getGoroutineID()
    if rs.owner == goID {
        rs.acquireCount++
        return
    }
    <-rs.sem
    if rs.owner == 0 {
        rs.owner = goID
        rs.acquireCount = 1
    }
}

// Release 释放可重入信号量
func (rs *ReentrantSemaphore) Release() {
    goID := getGoroutineID()
    if rs.owner != goID {
        panic("Release called by non - owner")
    }
    rs.acquireCount--
    if rs.acquireCount == 0 {
        rs.owner = 0
        rs.sem <- struct{}{}
    }
}

func getGoroutineID() uint64 {
    var buf [64]byte
    n := runtime.Stack(buf[:], false)
    idField := string(buf[:n])
    fields := strings.Fields(idField)
    for i, f := range fields {
        if f == "goroutine" {
            if i+1 < len(fields) {
                id, err := strconv.ParseUint(fields[i+1], 10, 64)
                if err == nil {
                    return id
                }
            }
        }
    }
    return 0
}

func worker(rs *ReentrantSemaphore, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    rs.Acquire()
    defer rs.Release()

    fmt.Printf("Worker %d started\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    maxConcurrent := 3
    rs := NewReentrantSemaphore(maxConcurrent)
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(rs, i, &wg)
    }

    wg.Wait()
}

在上述代码中:

  1. ReentrantSemaphore 结构体增加了 owner 字段用于记录当前获取信号量的 Goroutine 的 ID,acquireCount 字段用于记录获取次数。
  2. Acquire 方法首先检查当前 Goroutine 是否是信号量的所有者,如果是,则直接增加获取次数;否则,从信号量 Channel 获取数据,并更新所有者和获取次数。
  3. Release 方法检查当前 Goroutine 是否是所有者,然后减少获取次数。如果获取次数为 0,则将所有者设置为 0 并释放信号量。

信号量与其他同步原语的比较

与 Mutex 的比较

  1. 功能
    • Mutex(互斥锁)主要用于保护共享资源,确保同一时间只有一个 Goroutine 可以访问该资源。它只有两种状态:锁定和未锁定。
    • 信号量 可以控制同时访问共享资源的 Goroutine 数量,其值可以是大于 1 的整数,允许多个 Goroutine 同时访问共享资源,只要数量不超过信号量的值。
  2. 应用场景
    • Mutex 适用于保护那些不能被并发访问的共享资源,例如一个全局变量的读写操作。
    • 信号量 适用于控制对有限资源的并发访问,如数据库连接池、线程池等场景,允许一定数量的并发访问。
  3. 实现复杂度
    • Mutex 的实现相对简单,Go 语言的 sync.Mutex 是基于操作系统的互斥原语实现的。
    • 信号量 在 Go 语言中没有内置类型,需要通过 Channel 等同步原语来实现,实现相对复杂一些。

与 WaitGroup 的比较

  1. 功能
    • WaitGroup 主要用于等待一组 Goroutine 完成任务。它通过 Add 方法增加等待的任务数量,Done 方法表示一个任务完成,Wait 方法阻塞当前 Goroutine 直到所有任务完成。
    • 信号量 用于控制对共享资源的并发访问,通过获取和释放操作来管理资源的使用。
  2. 应用场景
    • WaitGroup 常用于在主 Goroutine 中等待多个子 Goroutine 完成任务,例如在并行计算任务完成后进行结果汇总。
    • 信号量 用于解决资源竞争和并发控制问题,如限制对共享文件的并发写入次数。
  3. 实现原理
    • WaitGroup 内部通过一个计数器来记录未完成的任务数量,当计数器为 0 时,Wait 操作返回。
    • 信号量 通过 Channel 的阻塞和非阻塞操作来模拟信号量的获取和释放,实现对资源的控制。

总结

通过 Channel 和其他同步原语,我们可以在 Go 语言中灵活地实现信号量,并根据实际需求进行优化和拓展。信号量在并发编程中是一个非常强大的工具,能够有效地控制对共享资源的并发访问,解决资源竞争和同步协作问题。与其他同步原语如 Mutex 和 WaitGroup 相比,信号量具有独特的功能和应用场景。在实际项目中,根据具体需求选择合适的同步机制,可以提高程序的性能和稳定性。

希望通过本文的介绍和示例代码,你对 Go 语言中信号量的实现和应用有了更深入的理解。在实际开发中,不断实践和探索,能够更好地发挥信号量在并发编程中的作用。