MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go RWMutex锁的读写冲突处理

2022-04-057.4k 阅读

Go RWMutex锁基础

RWMutex锁的定义

在Go语言的并发编程中,RWMutex(读写互斥锁)是一种非常重要的同步工具。它用于保护共享资源,允许多个读操作并发执行,但只允许一个写操作执行,并且写操作执行时不允许有读操作。RWMutexsync包中定义,其结构体定义如下:

type RWMutex struct {
    w           Mutex  // 用于写操作的互斥锁
    writerSem   uint32 // 写操作的信号量
    readerSem   uint32 // 读操作的信号量
    readerCount int32  // 当前读操作的数量
    readerWait  int32  // 等待写操作完成的读操作数量
}

这里面,w是一个普通的互斥锁,用于保护写操作。writerSemreaderSem是信号量,分别用于控制写操作和读操作的阻塞与唤醒。readerCount记录当前正在进行的读操作数量,readerWait记录等待写操作完成的读操作数量。

RWMutex锁的读操作

读操作通过调用RWMutexRLock方法来实现。当一个读操作调用RLock时,如果此时没有写操作正在进行(即readerCount没有被标记为负数,负数表示有写操作正在等待),那么该读操作可以直接执行,并将readerCount加1。代码示例如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data  int
    rwmu  sync.RWMutex
    wg    sync.WaitGroup
)

func read(id int) {
    defer wg.Done()
    rwmu.RLock()
    fmt.Printf("Reader %d is reading, data = %d\n", id, data)
    time.Sleep(time.Millisecond * 100)
    rwmu.RUnlock()
}

在这个示例中,read函数模拟了一个读操作。它首先调用RLock方法获取读锁,然后读取共享变量data的值并打印,最后调用RUnlock方法释放读锁。

RWMutex锁的写操作

写操作通过调用RWMutexLock方法来实现。当一个写操作调用Lock时,它会先将readerCount减去一个特定的值(通常是rwmutexMaxReaders),这会导致后续的读操作阻塞。然后它会获取w这个互斥锁,从而保证只有一个写操作可以执行。代码示例如下:

func write(id int, value int) {
    defer wg.Done()
    rwmu.Lock()
    data = value
    fmt.Printf("Writer %d is writing, data = %d\n", id, data)
    time.Sleep(time.Millisecond * 100)
    rwmu.Unlock()
}

在这个示例中,write函数模拟了一个写操作。它首先调用Lock方法获取写锁,然后修改共享变量data的值并打印,最后调用Unlock方法释放写锁。

读写冲突场景分析

读 - 读冲突

在使用RWMutex锁的情况下,读 - 读冲突是不会发生的。因为多个读操作可以同时获取读锁并访问共享资源。例如,我们可以启动多个读操作的协程:

func main() {
    data = 100
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go read(i)
    }
    wg.Wait()
}

在这个main函数中,我们启动了三个读操作的协程。由于读操作之间不会相互阻塞,所以它们可以并发执行,并且都能顺利读取到共享变量data的值。

读 - 写冲突

读 - 写冲突是需要重点关注的场景。当有写操作正在进行时,读操作会被阻塞。例如,我们先启动一个写操作,然后启动几个读操作:

func main() {
    wg.Add(1)
    go write(1, 200)

    time.Sleep(time.Millisecond * 50)

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go read(i)
    }
    wg.Wait()
}

在这个示例中,我们先启动了一个写操作协程write(1, 200),然后在短暂延迟后启动了三个读操作协程。由于写操作先获取了写锁,读操作在调用RLock时会被阻塞,直到写操作完成并释放写锁。

写 - 写冲突

写 - 写冲突同样是不允许的。当一个写操作获取了写锁后,其他写操作调用Lock方法时会被阻塞。例如:

func main() {
    wg.Add(1)
    go write(1, 200)

    time.Sleep(time.Millisecond * 50)

    wg.Add(1)
    go write(2, 300)

    wg.Wait()
}

在这个示例中,我们先启动了一个写操作协程write(1, 200),然后在短暂延迟后启动了另一个写操作协程write(2, 300)。第二个写操作在调用Lock时会被阻塞,直到第一个写操作完成并释放写锁。

读写冲突处理策略

优先读策略

在某些场景下,读操作的频率可能远高于写操作,并且读操作不会修改共享资源,所以我们希望优先保证读操作的性能。在Go语言的RWMutex实现中,默认就是优先读策略。因为只要没有写操作正在进行或者等待,读操作就可以并发执行。我们可以通过调整读操作和写操作的启动顺序以及数量来观察这种策略的效果。例如:

func main() {
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go read(i)
    }

    time.Sleep(time.Millisecond * 50)

    wg.Add(1)
    go write(1, 200)

    wg.Wait()
}

在这个示例中,我们先启动了10个读操作协程,然后在短暂延迟后启动了一个写操作协程。由于读操作优先,10个读操作可以并发执行,而写操作会在所有读操作完成后才会执行。

优先写策略

有时候,写操作可能具有更高的优先级,例如写操作是对重要配置的更新。为了实现优先写策略,我们可以在写操作前增加一些逻辑,让读操作等待写操作完成。一种简单的实现方式是使用一个额外的信号量来控制读操作的启动。示例代码如下:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data  int
    rwmu  sync.RWMutex
    wg    sync.WaitGroup
    writeSem = make(chan struct{}, 1)
)

func read(id int) {
    defer wg.Done()
    <-writeSem
    rwmu.RLock()
    fmt.Printf("Reader %d is reading, data = %d\n", id, data)
    time.Sleep(time.Millisecond * 100)
    rwmu.RUnlock()
    writeSem <- struct{}{}
}

func write(id int, value int) {
    defer wg.Done()
    writeSem <- struct{}{}
    rwmu.Lock()
    data = value
    fmt.Printf("Writer %d is writing, data = %d\n", id, data)
    time.Sleep(time.Millisecond * 100)
    rwmu.Unlock()
    <-writeSem
}

func main() {
    data = 100
    wg.Add(1)
    go write(1, 200)

    time.Sleep(time.Millisecond * 50)

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go read(i)
    }
    wg.Wait()
}

在这个示例中,我们使用了一个writeSem通道来控制读操作。写操作在获取写锁前先向writeSem通道发送一个信号,然后读操作在获取读锁前先从writeSem通道接收信号。这样就保证了写操作优先执行,读操作会等待写操作完成后才会执行。

公平策略

公平策略是指读操作和写操作按照请求的顺序依次执行,避免某些操作长时间等待。实现公平策略相对复杂一些,我们可以使用一个队列来记录操作的请求顺序。示例代码如下:

package main

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type Operation struct {
    isWrite bool
    id      int
    value   int
    done    chan struct{}
}

var (
    data  int
    rwmu  sync.RWMutex
    wg    sync.WaitGroup
    queue = list.New()
    mutex sync.Mutex
)

func read(id int) {
    done := make(chan struct{})
    op := Operation{isWrite: false, id: id, done: done}
    mutex.Lock()
    queue.PushBack(op)
    mutex.Unlock()

    <-done
    rwmu.RLock()
    fmt.Printf("Reader %d is reading, data = %d\n", id, data)
    time.Sleep(time.Millisecond * 100)
    rwmu.RUnlock()
    wg.Done()
}

func write(id int, value int) {
    done := make(chan struct{})
    op := Operation{isWrite: true, id: id, value: value, done: done}
    mutex.Lock()
    queue.PushBack(op)
    mutex.Unlock()

    <-done
    rwmu.Lock()
    data = value
    fmt.Printf("Writer %d is writing, data = %d\n", id, data)
    time.Sleep(time.Millisecond * 100)
    rwmu.Unlock()
    wg.Done()
}

func scheduler() {
    for {
        mutex.Lock()
        if queue.Len() == 0 {
            mutex.Unlock()
            time.Sleep(time.Millisecond * 10)
            continue
        }
        op := queue.Front().Value.(Operation)
        queue.Remove(queue.Front())
        mutex.Unlock()

        if op.isWrite {
            write(op.id, op.value)
        } else {
            read(op.id)
        }
        close(op.done)
    }
}

func main() {
    data = 100
    go scheduler()

    wg.Add(1)
    go write(1, 200)

    time.Sleep(time.Millisecond * 50)

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go read(i)
    }
    wg.Wait()
}

在这个示例中,我们定义了一个Operation结构体来表示读操作或写操作,并使用一个链表queue来记录操作请求。scheduler函数负责按照队列顺序依次执行操作,从而实现公平策略。

RWMutex锁在实际项目中的应用

缓存场景

在缓存系统中,读操作通常远远多于写操作。例如,一个简单的内存缓存实现如下:

package main

import (
    "fmt"
    "sync"
)

type Cache struct {
    data map[string]interface{}
    rwmu sync.RWMutex
}

func NewCache() *Cache {
    return &Cache{
        data: make(map[string]interface{}),
    }
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.rwmu.RLock()
    value, exists := c.data[key]
    c.rwmu.RUnlock()
    return value, exists
}

func (c *Cache) Set(key string, value interface{}) {
    c.rwmu.Lock()
    c.data[key] = value
    c.rwmu.Unlock()
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            cache.Set(fmt.Sprintf("key%d", id), id)
        }(i)
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            value, exists := cache.Get(fmt.Sprintf("key%d", id))
            if exists {
                fmt.Printf("Read key%d, value = %d\n", id, value)
            }
        }(i)
    }

    wg.Wait()
}

在这个缓存示例中,Get方法是读操作,Set方法是写操作。由于读操作频繁,使用RWMutex锁可以保证多个读操作并发执行,提高缓存的读取性能。

配置文件加载场景

在应用程序中,配置文件通常只在启动时加载一次,后续主要是读操作。例如,一个简单的配置文件加载器实现如下:

package main

import (
    "fmt"
    "sync"
)

type Config struct {
    serverAddr string
    databaseURL string
    rwmu       sync.RWMutex
}

func NewConfig() *Config {
    return &Config{
        serverAddr:  "127.0.0.1:8080",
        databaseURL: "mongodb://localhost:27017",
    }
}

func (c *Config) GetServerAddr() string {
    c.rwmu.RLock()
    addr := c.serverAddr
    c.rwmu.RUnlock()
    return addr
}

func (c *Config) GetDatabaseURL() string {
    c.rwmu.RLock()
    url := c.databaseURL
    c.rwmu.RUnlock()
    return url
}

func (c *Config) UpdateServerAddr(addr string) {
    c.rwmu.Lock()
    c.serverAddr = addr
    c.rwmu.Unlock()
}

func (c *Config) UpdateDatabaseURL(url string) {
    c.rwmu.Lock()
    c.databaseURL = url
    c.rwmu.Unlock()
}

func main() {
    config := NewConfig()
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Printf("ServerAddr = %s\n", config.GetServerAddr())
        }()
    }

    wg.Add(1)
    go func() {
        defer wg.Done()
        config.UpdateServerAddr("192.168.1.1:8080")
    }()

    wg.Wait()
}

在这个配置文件加载器示例中,GetServerAddrGetDatabaseURL方法是读操作,UpdateServerAddrUpdateDatabaseURL方法是写操作。使用RWMutex锁可以确保在配置更新时,读操作不会受到影响,同时保证写操作的原子性。

RWMutex锁的性能优化

减少锁的粒度

减少锁的粒度是提高性能的一种有效方法。例如,在一个复杂的数据结构中,如果可以将其划分为多个独立的部分,每个部分使用单独的RWMutex锁进行保护,那么读操作和写操作就可以在不同的部分并发执行。假设我们有一个包含多个子数据结构的CompositeData结构体:

package main

import (
    "fmt"
    "sync"
)

type SubData1 struct {
    value int
    rwmu  sync.RWMutex
}

type SubData2 struct {
    value string
    rwmu  sync.RWMutex
}

type CompositeData struct {
    subData1 SubData1
    subData2 SubData2
}

func (c *CompositeData) GetSubData1() int {
    c.subData1.rwmu.RLock()
    value := c.subData1.value
    c.subData1.rwmu.RUnlock()
    return value
}

func (c *CompositeData) SetSubData1(newValue int) {
    c.subData1.rwmu.Lock()
    c.subData1.value = newValue
    c.subData1.rwmu.Unlock()
}

func (c *CompositeData) GetSubData2() string {
    c.subData2.rwmu.RLock()
    value := c.subData2.value
    c.subData2.rwmu.RUnlock()
    return value
}

func (c *CompositeData) SetSubData2(newValue string) {
    c.subData2.rwmu.Lock()
    c.subData2.value = newValue
    c.subData2.rwmu.Unlock()
}

func main() {
    compositeData := CompositeData{
        subData1: SubData1{value: 100},
        subData2: SubData2{value: "initial"},
    }

    var wg sync.WaitGroup

    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            compositeData.SetSubData1(compositeData.GetSubData1() + 1)
        }()
    }

    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println("SubData2 =", compositeData.GetSubData2())
        }()
    }

    wg.Wait()
}

在这个示例中,CompositeData结构体包含两个子数据结构SubData1SubData2,每个子数据结构都有自己的RWMutex锁。这样,对SubData1的操作和对SubData2的操作可以并发执行,提高了整体性能。

避免不必要的锁操作

在代码中,要尽量避免不必要的锁操作。例如,如果某些操作不需要修改共享资源,并且在该操作执行期间不会有其他协程修改共享资源,那么就可以不使用锁。假设我们有一个只读的计算函数:

package main

import (
    "fmt"
    "sync"
)

type Data struct {
    value int
    rwmu  sync.RWMutex
}

func (d *Data) ReadOnlyCompute() int {
    // 这里不需要获取锁,因为该操作是只读的
    return d.value * 2
}

func main() {
    data := Data{value: 10}
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            result := data.ReadOnlyCompute()
            fmt.Println("Result =", result)
        }()
    }

    wg.Wait()
}

在这个示例中,ReadOnlyCompute函数是一个只读操作,不需要获取锁。这样可以减少锁的竞争,提高性能。

使用读写锁池

在高并发场景下,可以考虑使用读写锁池来复用锁资源,减少锁的创建和销毁开销。虽然Go语言本身没有提供内置的读写锁池,但我们可以自己实现一个简单的锁池。示例代码如下:

package main

import (
    "fmt"
    "sync"
)

type RWMutexPool struct {
    pool sync.Pool
}

func NewRWMutexPool() *RWMutexPool {
    return &RWMutexPool{
        pool: sync.Pool{
            New: func() interface{} {
                return &sync.RWMutex{}
            },
        },
    }
}

func (p *RWMutexPool) Get() *sync.RWMutex {
    return p.pool.Get().(*sync.RWMutex)
}

func (p *RWMutexPool) Put(mu *sync.RWMutex) {
    p.pool.Put(mu)
}

func main() {
    pool := NewRWMutexPool()
    mu1 := pool.Get()
    mu2 := pool.Get()

    mu1.RLock()
    fmt.Println("mu1 is locked for reading")
    mu1.RUnlock()

    mu2.Lock()
    fmt.Println("mu2 is locked for writing")
    mu2.Unlock()

    pool.Put(mu1)
    pool.Put(mu2)
}

在这个示例中,我们定义了一个RWMutexPool结构体来管理读写锁池。Get方法从池中获取一个锁,Put方法将锁放回池中。通过这种方式,可以复用锁资源,提高性能。

RWMutex锁使用中的常见问题及解决方法

死锁问题

死锁是并发编程中常见的问题之一。在使用RWMutex锁时,如果锁的获取顺序不当,就可能导致死锁。例如:

package main

import (
    "fmt"
    "sync"
)

var (
    mu1 sync.RWMutex
    mu2 sync.RWMutex
)

func deadlock1() {
    mu1.Lock()
    fmt.Println("deadlock1: mu1 locked")
    mu2.Lock()
    fmt.Println("deadlock1: mu2 locked")
    mu2.Unlock()
    mu1.Unlock()
}

func deadlock2() {
    mu2.Lock()
    fmt.Println("deadlock2: mu2 locked")
    mu1.Lock()
    fmt.Println("deadlock2: mu1 locked")
    mu1.Unlock()
    mu2.Unlock()
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        deadlock1()
    }()
    go func() {
        defer wg.Done()
        deadlock2()
    }()
    wg.Wait()
}

在这个示例中,deadlock1deadlock2两个函数在获取锁的顺序上不一致,导致死锁。解决方法是确保所有协程按照相同的顺序获取锁。例如:

package main

import (
    "fmt"
    "sync"
)

var (
    mu1 sync.RWMutex
    mu2 sync.RWMutex
)

func fixed1() {
    mu1.Lock()
    fmt.Println("fixed1: mu1 locked")
    mu2.Lock()
    fmt.Println("fixed1: mu2 locked")
    mu2.Unlock()
    mu1.Unlock()
}

func fixed2() {
    mu1.Lock()
    fmt.Println("fixed2: mu1 locked")
    mu2.Lock()
    fmt.Println("fixed2: mu2 locked")
    mu2.Unlock()
    mu1.Unlock()
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        fixed1()
    }()
    go func() {
        defer wg.Done()
        fixed2()
    }()
    wg.Wait()
}

在修改后的代码中,fixed1fixed2函数都按照先获取mu1锁再获取mu2锁的顺序进行操作,避免了死锁。

锁粒度不当问题

如前文所述,锁粒度不当会影响性能。如果锁的粒度太大,会导致并发性能下降;如果锁的粒度太小,会增加锁的管理开销。例如,在一个简单的计数器实现中,如果使用一个大粒度的锁:

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int
    rwmu  sync.RWMutex
}

func (c *Counter) Increment() {
    c.rwmu.Lock()
    c.value++
    c.rwmu.Unlock()
}

func (c *Counter) Decrement() {
    c.rwmu.Lock()
    c.value--
    c.rwmu.Unlock()
}

func (c *Counter) GetValue() int {
    c.rwmu.RLock()
    value := c.value
    c.rwmu.RUnlock()
    return value
}

func main() {
    counter := Counter{}
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Decrement()
        }()
    }

    wg.Wait()
    fmt.Println("Final value =", counter.GetValue())
}

在这个示例中,IncrementDecrement方法对整个Counter结构体加锁,锁粒度较大。如果将其改为按操作类型分别加锁,可以提高并发性能:

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int
    incRwmu  sync.RWMutex
    decRwmu  sync.RWMutex
}

func (c *Counter) Increment() {
    c.incRwmu.Lock()
    c.value++
    c.incRwmu.Unlock()
}

func (c *Counter) Decrement() {
    c.decRwmu.Lock()
    c.value--
    c.decRwmu.Unlock()
}

func (c *Counter) GetValue() int {
    c.incRwmu.RLock()
    c.decRwmu.RLock()
    value := c.value
    c.decRwmu.RUnlock()
    c.incRwmu.RUnlock()
    return value
}

func main() {
    counter := Counter{}
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Decrement()
        }()
    }

    wg.Wait()
    fmt.Println("Final value =", counter.GetValue())
}

在修改后的代码中,IncrementDecrement方法分别使用不同的锁,减少了锁的竞争,提高了并发性能。

忘记释放锁

忘记释放锁也是一个常见问题。例如,在一个函数中获取了锁,但由于异常或逻辑错误没有释放锁,会导致其他协程一直等待。在Go语言中,可以使用defer关键字来确保锁一定会被释放。例如:

package main

import (
    "fmt"
    "sync"
)

var rwmu sync.RWMutex

func forgetUnlock() {
    rwmu.Lock()
    fmt.Println("forgetUnlock: locked")
    // 这里忘记释放锁
}

func correctUnlock() {
    rwmu.Lock()
    defer rwmu.Unlock()
    fmt.Println("correctUnlock: locked")
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        forgetUnlock()
    }()
    go func() {
        defer wg.Done()
        correctUnlock()
    }()
    wg.Wait()
}

forgetUnlock函数中,由于没有释放锁,会导致后续对rwmu锁的操作被阻塞。而在correctUnlock函数中,使用defer关键字确保了锁一定会被释放。

总结

通过对Go语言中RWMutex锁的读写冲突处理的深入探讨,我们了解了其内部原理、各种冲突场景、处理策略、实际应用、性能优化以及常见问题和解决方法。在实际的并发编程中,合理使用RWMutex锁并根据具体场景选择合适的策略至关重要。希望本文的内容能帮助读者更好地理解和运用RWMutex锁,写出高效、稳定的并发代码。