MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go并发编程互斥机制的底层实现原理

2022-07-301.2k 阅读

Go语言并发编程基础

在深入探讨Go并发编程互斥机制的底层实现原理之前,我们先来回顾一下Go语言并发编程的一些基础概念。

Go语言从语言层面原生支持并发编程,通过 goroutine 来实现轻量级的线程。与传统线程相比,goroutine 非常轻量,创建和销毁的开销极小,使得我们可以轻松创建大量的 goroutine 来处理并发任务。

例如,以下是一个简单的 goroutine 示例:

package main

import (
    "fmt"
    "time"
)

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Println("Number:", i)
        time.Sleep(100 * time.Millisecond)
    }
}

func printLetters() {
    for i := 'a'; i <= 'e'; i++ {
        fmt.Println("Letter:", string(i))
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    go printNumbers()
    go printLetters()

    time.Sleep(1000 * time.Millisecond)
    fmt.Println("Main function exiting")
}

在上述代码中,通过 go 关键字启动了两个 goroutine,分别执行 printNumbersprintLetters 函数。这两个 goroutine 并发执行,互不干扰。

而在并发编程中,多个 goroutine 可能会同时访问共享资源。如果没有适当的同步机制,就会导致数据竞争(data race)问题,使得程序出现不可预测的结果。为了解决这个问题,Go语言提供了多种同步机制,其中互斥锁(Mutex)是最常用的一种。

互斥锁(Mutex)的基本使用

互斥锁,全称为“相互排斥锁”,其作用是保证在同一时刻只有一个 goroutine 能够访问共享资源。在Go语言中,互斥锁由标准库 sync 包中的 Mutex 结构体表示。

下面是一个简单的使用互斥锁的示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go increment(&wg)
    }

    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在上述代码中,我们定义了一个全局变量 counter 作为共享资源,并使用 sync.Mutex 类型的变量 mu 作为互斥锁。在 increment 函数中,通过 mu.Lock() 方法获取锁,确保同一时刻只有一个 goroutine 能够执行 counter++ 操作,操作完成后通过 mu.Unlock() 方法释放锁。这样就避免了多个 goroutine 同时修改 counter 导致的数据竞争问题。

Go语言互斥锁的底层实现

1. 数据结构

在Go语言的标准库源码中,sync.Mutex 的定义位于 src/sync/mutex.go 文件中。其数据结构如下:

type Mutex struct {
    state int32
    sema  uint32
}
  • state:这是一个32位的整数,用于表示互斥锁的状态。它包含了多个标志位,用于记录锁的持有状态、等待队列状态等信息。
  • sema:这是一个信号量,用于阻塞和唤醒等待获取锁的 goroutine

2. 状态标志位

state 的不同位有不同的含义,通过位运算来操作和判断这些标志位:

  • Locked标志state 的最低位表示锁是否被持有。如果该位为1,则表示锁已被持有;为0则表示锁未被持有。
  • Woken标志:第1位表示是否有等待的 goroutine 被唤醒。当有 goroutine 被唤醒时,该位被设置为1。
  • Starving标志:第2位表示锁是否处于饥饿状态。如果该位为1,则表示锁处于饥饿状态,此时等待时间最长的 goroutine 会优先获取锁。
  • WaiterShift:这是一个常量,定义为3,用于计算等待队列中等待者的数量。通过右移 state 3位及以上的部分,可以得到等待者的数量。

3. Lock方法实现

Lock 方法用于获取互斥锁。其实现逻辑较为复杂,主要步骤如下:

func (m *Mutex) Lock() {
    // 快速路径:如果锁未被持有,尝试直接获取锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // 慢速路径:获取锁失败,进入等待队列
    m.lockSlow()
}
  • 快速路径:首先通过 atomic.CompareAndSwapInt32 函数尝试原子地将 state 从0(未持有)设置为 mutexLocked(已持有)。如果设置成功,说明成功获取到锁,直接返回。同时,如果开启了竞态检测(race.Enabled),则调用 race.Acquire 记录获取锁的操作。
  • 慢速路径:如果快速路径获取锁失败,说明锁已被其他 goroutine 持有,此时调用 lockSlow 方法进入等待队列。

lockSlow 方法的实现如下:

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // 如果锁未被持有且没有等待者,尝试获取锁
        if old&(mutexLocked|mutexStarving) == 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
            if old&mutexStarving == 0 && awoke {
                runtime_Semrelease(&m.sema, false, 1)
            }
            if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
            }
            return
        }
        // 检查是否应该进入饥饿模式
        if old&mutexStarving != 0 || old&mutexLocked != 0 {
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
        }
        // 更新等待者数量
        new := old
        if old&mutexStarving == 0 {
            new |= mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }
        // 设置新的状态
        atomic.StoreInt32(&m.state, new)
        // 阻塞当前goroutine
        runtime_SemacquireMutex(&m.sema, queueLifo, 1)
        old = m.state
        // 检查锁是否已经被释放
        if old&mutexStarving != 0 {
            if runtime_nanotime()-waitStartTime < starvationThresholdNs || old&(mutexLocked|mutexWoken|mutexWaiterShift) != 0 {
                continue
            }
            atomic.StoreInt32(&m.state, old&^mutexStarving|mutexLocked)
            runtime_Semrelease(&m.sema, false, 1)
        } else if old&(mutexLocked|mutexWoken) != 0 {
            continue
        }
        awoke = true
        iter++
    }
}
  • 尝试获取锁:在循环中,首先检查锁是否未被持有且没有等待者,如果是,则尝试通过 atomic.CompareAndSwapInt32 获取锁。如果获取成功,并且锁不在饥饿状态且当前 goroutine 已被唤醒,则通过 runtime_Semrelease 释放信号量,通知其他等待的 goroutine
  • 饥饿检测:根据等待时间是否超过 starvationThresholdNs(默认1毫秒)来判断是否进入饥饿模式。如果锁处于饥饿状态,等待时间最长的 goroutine 会优先获取锁。
  • 更新状态:根据当前状态更新 state,增加等待者数量,设置或清除相应的标志位。
  • 阻塞等待:通过 runtime_SemacquireMutex 阻塞当前 goroutine,等待锁的释放。当被唤醒后,再次检查锁的状态,决定是否继续等待。

4. Unlock方法实现

Unlock 方法用于释放互斥锁,其实现如下:

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}
  • 快速路径:首先通过 atomic.AddInt32state 减去 mutexLocked,尝试释放锁。如果 new 为0,说明释放锁成功,直接返回。同时,如果开启了竞态检测,调用 race.Release 记录释放锁的操作。
  • 慢速路径:如果 new 不为0,说明还有其他 goroutine 在等待锁,调用 unlockSlow 方法处理。

unlockSlow 方法的实现如下:

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // 如果没有等待者或者锁处于饥饿状态,直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
                return
            }
            // 唤醒一个等待者
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 处理饥饿状态下的解锁
        runtime_Semrelease(&m.sema, true, 1)
    }
}
  • 检查错误:首先检查是否尝试解锁未锁定的互斥锁,如果是,则抛出异常。
  • 非饥饿状态处理:如果锁不在饥饿状态,检查是否有等待者。如果有等待者,则唤醒一个等待的 goroutine,通过 runtime_Semrelease 释放信号量。
  • 饥饿状态处理:如果锁处于饥饿状态,直接通过 runtime_Semrelease 释放信号量,让等待时间最长的 goroutine 优先获取锁。

读写互斥锁(RWMutex)

除了普通的互斥锁 Mutex,Go语言还提供了读写互斥锁 RWMutex。读写互斥锁允许多个 goroutine 同时进行读操作,但只允许一个 goroutine 进行写操作。这样可以提高并发性能,特别是在读多写少的场景下。

1. 数据结构

RWMutex 的数据结构定义如下:

type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}
  • w:一个普通的 Mutex,用于保护写操作。
  • writerSem:用于阻塞写操作的信号量。
  • readerSem:用于阻塞读操作的信号量。
  • readerCount:记录当前正在进行读操作的 goroutine 数量。
  • readerWait:记录等待写操作完成的读操作的 goroutine 数量。

2. 读锁(RLock)方法实现

读锁 RLock 方法允许多个 goroutine 同时获取读锁,其实现如下:

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        runtime_Semacquire(&rw.readerSem)
    }
}
  • 首先通过 atomic.AddInt32readerCount 加1。如果 readerCount 变为负数,说明有写操作正在进行或者有写操作等待,此时通过 runtime_Semacquire 阻塞当前 goroutine,等待写操作完成。

3. 读锁(RUnlock)方法实现

读锁 RUnlock 方法用于释放读锁,其实现如下:

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            runtime_Semrelease(&rw.writerSem, false, 1)
        }
    }
}
  • 通过 atomic.AddInt32readerCount 减1。如果 readerCount 小于0,说明之前有写操作等待,此时将 readerWait 减1。如果 readerWait 变为0,说明所有等待的读操作都已完成,通过 runtime_Semrelease 唤醒等待的写操作。

4. 写锁(Lock)方法实现

写锁 Lock 方法用于获取写锁,其实现如下:

func (rw *RWMutex) Lock() {
    rw.w.Lock()
    // 等待所有读操作完成
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_Semacquire(&rw.writerSem)
    }
}
  • 首先获取内部的 Mutex,防止其他写操作同时进行。然后将 readerCount 减去 rwmutexMaxReaders(一个较大的负数),表示有写操作开始。如果此时 readerCount 不为0,说明有读操作正在进行,将 readerWait 加上剩余的读操作数量,并通过 runtime_Semacquire 阻塞当前 goroutine,等待所有读操作完成。

5. 写锁(Unlock)方法实现

写锁 Unlock 方法用于释放写锁,其实现如下:

func (rw *RWMutex) Unlock() {
    // 恢复readerCount
    atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    rw.w.Unlock()
    // 唤醒所有等待的读操作
    runtime_Semrelease(&rw.readerSem, true, 0)
}
  • 首先将 readerCount 加上 rwmutexMaxReaders,恢复其正常状态。然后释放内部的 Mutex。最后通过 runtime_Semrelease 唤醒所有等待的读操作。

互斥机制的性能优化与注意事项

  1. 减少锁的粒度:在设计程序时,尽量将锁的保护范围缩小,只在访问共享资源的关键部分加锁。这样可以减少锁的竞争,提高并发性能。例如,如果有多个独立的共享资源,可以为每个资源使用单独的锁。
  2. 避免死锁:死锁是并发编程中常见的问题,通常是由于多个 goroutine 相互等待对方释放锁而导致的。为了避免死锁,要确保获取锁的顺序一致,避免嵌套锁的无序获取。
  3. 使用读写互斥锁:在读多写少的场景下,使用 RWMutex 可以显著提高性能。但要注意,写操作会阻塞所有的读操作,所以写操作应该尽量简短。
  4. 合理使用非阻塞数据结构:对于一些场景,可以使用非阻塞的数据结构(如无锁队列)来避免使用锁,进一步提高并发性能。但非阻塞数据结构的实现通常较为复杂,需要谨慎使用。

总结

Go语言的并发编程互斥机制为开发者提供了强大而灵活的工具来处理共享资源的并发访问。通过深入理解 MutexRWMutex 的底层实现原理,我们可以更加高效地编写并发程序,避免数据竞争和死锁等问题。在实际应用中,根据不同的场景选择合适的同步机制,并进行性能优化,是编写高性能并发程序的关键。同时,不断地学习和实践,积累并发编程的经验,也是非常重要的。希望本文能够帮助你对Go并发编程互斥机制有更深入的理解,并在实际项目中运用自如。