MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go基本原语与扩展原语

2021-10-033.3k 阅读

Go 基本原语

1. 并发原语 - goroutine

Go 语言的并发编程模型基于 goroutine,这是一种轻量级的线程实现。与传统线程相比,goroutine 的创建和销毁成本极低,使得可以在一台机器上轻松创建数以万计的 goroutine。

创建 goroutine: 通过 go 关键字即可创建一个新的 goroutine。例如,以下代码创建了一个简单的 goroutine 来打印字符串:

package main

import (
    "fmt"
)

func printMessage() {
    fmt.Println("Hello from goroutine")
}

func main() {
    go printMessage()
    fmt.Println("Main function")
}

在上述代码中,go printMessage() 启动了一个新的 goroutine 来执行 printMessage 函数。主函数并不会等待这个 goroutine 完成就继续执行,因此会先打印 “Main function”。

goroutine 调度: Go 的运行时系统包含一个调度器,负责管理和调度 goroutine。调度器采用 M:N 调度模型,即多个 goroutine 映射到多个操作系统线程上。这种模型允许在少量操作系统线程上高效运行大量 goroutine。例如,假设系统中有 1000 个 goroutine,但只使用了 10 个操作系统线程。调度器会在这些线程上复用 goroutine,根据 goroutine 的状态(运行、就绪、阻塞等)进行切换。

2. 同步原语 - mutex

在并发编程中,多个 goroutine 可能会同时访问共享资源,这可能导致数据竞争和不一致的问题。Mutex(互斥锁)是一种用于保护共享资源的同步原语,它保证在同一时刻只有一个 goroutine 能够访问共享资源。

使用 mutex

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在这段代码中,mu 是一个 sync.Mutex 实例。increment 函数在修改 counter 变量之前调用 mu.Lock(),这样就阻止了其他 goroutine 同时进入临界区(对 counter 的修改部分)。修改完成后,调用 mu.Unlock() 释放锁,允许其他 goroutine 获取锁并访问共享资源。如果没有 mutex,多个 goroutine 同时修改 counter 会导致数据竞争,最终 counter 的值可能不是预期的 1000。

3. 同步原语 - 读写锁(RWMutex)

有时候,共享资源的读取操作远远多于写入操作。在这种情况下,使用普通的 Mutex 会限制性能,因为即使是读取操作也会独占锁。RWMutex(读写锁)则解决了这个问题,它允许多个 goroutine 同时进行读取操作,但在写入时会独占锁。

使用 RWMutex

package main

import (
    "fmt"
    "sync"
)

var (
    data  int
    rwmu  sync.RWMutex
)

func readData(wg *sync.WaitGroup) {
    defer wg.Done()
    rwmu.RLock()
    fmt.Println("Read data:", data)
    rwmu.RUnlock()
}

func writeData(wg *sync.WaitGroup, value int) {
    defer wg.Done()
    rwmu.Lock()
    data = value
    fmt.Println("Write data:", data)
    rwmu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go readData(&wg)
    }
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go writeData(&wg, i*10)
    }
    wg.Wait()
}

在上述代码中,readData 函数使用 rwmu.RLock() 获取读锁,允许多个 goroutine 同时读取 data。而 writeData 函数使用 rwmu.Lock() 获取写锁,在写入 data 时会阻止其他读或写操作。这样,在读取操作频繁的场景下,读写锁可以提高并发性能。

4. 通信原语 - channel

在 Go 语言中,提倡使用 “通信顺序进程”(CSP)模型进行并发编程,而 channel 是实现 CSP 模型的核心通信原语。Channel 用于在不同 goroutine 之间进行数据传递和同步。

创建和使用 channel

package main

import (
    "fmt"
)

func sendData(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

func receiveData(ch chan int) {
    for value := range ch {
        fmt.Println("Received:", value)
    }
}

func main() {
    ch := make(chan int)
    go sendData(ch)
    receiveData(ch)
}

在这段代码中,首先创建了一个整型的 channel chsendData 函数通过 ch <- i 将数据发送到 channel 中,而 receiveData 函数使用 for... range 循环从 channel 中接收数据。close(ch) 用于关闭 channel,这样接收端的 for... range 循环会在 channel 关闭且所有数据被接收后退出。

无缓冲和有缓冲 channel

  • 无缓冲 channel:创建时不指定容量,如 ch := make(chan int)。在无缓冲 channel 中,发送操作和接收操作必须同时准备好,否则会发生阻塞。例如,ch <- 1 会阻塞,直到有其他 goroutine 执行 <-ch 来接收数据。
  • 有缓冲 channel:创建时指定容量,如 ch := make(chan int, 5)。有缓冲 channel 允许在没有接收者的情况下,先发送一定数量的数据到缓冲区。只有当缓冲区满时,发送操作才会阻塞;只有当缓冲区为空时,接收操作才会阻塞。

Go 扩展原语

1. 条件变量(Cond)

Cond 是基于 Mutex 的更高级同步原语,它允许 goroutine 在满足特定条件时被唤醒。通常用于多个 goroutine 需要等待某个共享资源状态改变的场景。

使用 Cond

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    ready  bool
    mu     sync.Mutex
    cond   sync.Cond
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    for!ready {
        cond.Wait()
    }
    fmt.Println("Worker is working")
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    time.Sleep(2 * time.Second)
    mu.Lock()
    ready = true
    cond.Broadcast()
    mu.Unlock()
    wg.Wait()
}

在上述代码中,cond 是一个 sync.Cond 实例,它基于 mu 这个 Mutexworker 函数在 ready 条件不满足时,通过 cond.Wait() 进入等待状态,并释放 mu 锁。当主函数中设置 readytrue 并调用 cond.Broadcast() 时,所有等待在 cond 上的 goroutine 会被唤醒。cond.Wait() 在被唤醒后会重新获取 mu 锁,然后继续执行后续代码。

2. 信号量(Semaphore)

虽然 Go 标准库没有直接提供信号量类型,但可以通过 channel 来模拟实现信号量。信号量用于控制同时访问共享资源的 goroutine 数量。

实现信号量

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(capacity int) Semaphore {
    return make(Semaphore, capacity)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

func worker(sem Semaphore, id int) {
    sem.Acquire()
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Worker %d finished\n", id)
    sem.Release()
}

func main() {
    sem := NewSemaphore(2)
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(sem, id)
        }(i)
    }
    wg.Wait()
}

在这段代码中,Semaphore 是一个基于 channel 的自定义类型。NewSemaphore 函数创建一个具有指定容量的信号量。Acquire 方法通过向 channel 发送一个空结构体来获取信号量,如果 channel 已满(即达到最大允许的并发数),则会阻塞。Release 方法从 channel 接收一个空结构体,释放信号量,允许其他 goroutine 获取。在 main 函数中,创建了一个容量为 2 的信号量,这意味着最多同时有 2 个 worker goroutine 可以获取信号量并执行任务。

3. 原子操作

在某些情况下,对共享资源的简单操作(如整数的增减)可以使用原子操作来避免使用锁,从而提高性能。Go 语言的 sync/atomic 包提供了一系列原子操作函数。

使用原子操作

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

var counter int64

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    atomic.AddInt64(&counter, 1)
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter value:", atomic.LoadInt64(&counter))
}

在上述代码中,atomic.AddInt64 函数以原子方式增加 counter 的值。这确保了即使在多个 goroutine 同时执行 increment 函数时,counter 的值也能正确更新,而无需使用 Mutexatomic.LoadInt64 函数用于安全地读取 counter 的值。原子操作适用于对简单数据类型的操作,在高并发场景下可以减少锁带来的开销。

4. 同步组(WaitGroup)

WaitGroup 用于等待一组 goroutine 完成。它内部维护一个计数器,通过 Add 方法增加计数,通过 Done 方法减少计数,Wait 方法会阻塞直到计数器归零。

使用 WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func task(wg *sync.WaitGroup, id int) {
    defer wg.Done()
    fmt.Printf("Task %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Task %d finished\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go task(&wg, i)
    }
    fmt.Println("Waiting for tasks to complete...")
    wg.Wait()
    fmt.Println("All tasks completed")
}

在这段代码中,每个 task 函数在开始时调用 wg.Add(1) 增加 WaitGroup 的计数器,在结束时调用 wg.Done() 减少计数器。主函数通过 wg.Wait() 等待所有 task 函数完成。这样可以确保在所有 goroutine 执行完毕后,主函数才继续执行后续代码。

5. 上下文(Context)

Context 用于在 goroutine 之间传递截止时间、取消信号等信息。它在处理 HTTP 请求、控制并发操作等场景中非常有用。

使用 Context

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    select {
    case <-time.After(5 * time.Second):
        fmt.Println("Task completed")
    case <-ctx.Done():
        fmt.Println("Task cancelled:", ctx.Err())
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    go longRunningTask(ctx)
    time.Sleep(4 * time.Second)
}

在上述代码中,context.WithTimeout 创建了一个带有超时的 Contextctx 是这个 Context 实例,cancel 是取消函数。longRunningTask 函数通过 select 语句监听 ctx.Done() 通道。如果在任务完成前 Context 被取消(这里是因为超时),ctx.Done() 通道会收到信号,任务会被取消并打印相应信息。defer cancel() 确保在函数结束时无论是否超时都会调用取消函数,释放相关资源。

原语的综合应用

在实际的并发编程中,往往需要综合使用多种原语来实现复杂的功能。下面通过一个简单的示例来展示如何综合使用 goroutine、channel、mutex 和 WaitGroup。

package main

import (
    "fmt"
    "sync"
)

type Request struct {
    ID int
    // 其他请求数据
}

type Response struct {
    ID int
    // 其他响应数据
}

type RequestHandler struct {
    requestCh  chan Request
    responseCh chan Response
    mu         sync.Mutex
    requests   map[int]Request
}

func NewRequestHandler() *RequestHandler {
    return &RequestHandler{
        requestCh:  make(chan Request),
        responseCh: make(chan Response),
        requests:   make(map[int]Request),
    }
}

func (rh *RequestHandler) HandleRequests(wg *sync.WaitGroup) {
    defer wg.Done()
    for req := range rh.requestCh {
        rh.mu.Lock()
        rh.requests[req.ID] = req
        rh.mu.Unlock()
        // 模拟处理请求
        time.Sleep(1 * time.Second)
        resp := Response{ID: req.ID}
        rh.responseCh <- resp
    }
    close(rh.responseCh)
}

func main() {
    rh := NewRequestHandler()
    var wg sync.WaitGroup
    wg.Add(1)
    go rh.HandleRequests(&wg)

    requests := []Request{
        {ID: 1},
        {ID: 2},
        {ID: 3},
    }

    for _, req := range requests {
        rh.requestCh <- req
    }
    close(rh.requestCh)

    for resp := range rh.responseCh {
        fmt.Printf("Received response for request %d\n", resp.ID)
    }

    wg.Wait()
}

在这个示例中,RequestHandler 结构体封装了处理请求的逻辑。requestCh 用于接收请求,responseCh 用于发送响应。mutex 用于保护 requests 映射,因为可能有多个 goroutine 同时访问它。HandleRequests 函数在一个单独的 goroutine 中运行,从 requestCh 接收请求,处理后将响应发送到 responseCh。主函数创建了 RequestHandler 实例并启动处理 goroutine,然后发送一系列请求,最后从 responseCh 接收响应。通过这种方式,综合运用了 goroutine 实现并发处理、channel 进行通信、mutex 保护共享资源以及 WaitGroup 等待处理 goroutine 完成。

原语使用的注意事项

  1. 死锁:在使用同步原语(如 mutex、cond 等)时,死锁是一个常见的问题。例如,多个 goroutine 相互等待对方释放锁就会导致死锁。编写代码时要仔细设计锁的获取和释放顺序,避免循环依赖。
  2. 性能问题:虽然原子操作和无锁数据结构可以提高性能,但过度使用可能会使代码难以理解和维护。在选择使用原子操作还是锁时,要根据具体的场景进行权衡。例如,对于复杂的数据结构操作,锁可能是更合适的选择,尽管它可能会带来一定的性能开销。
  3. channel 关闭:在使用 channel 时,要注意正确关闭 channel。如果没有正确关闭,可能会导致接收端的 for... range 循环永远阻塞。同时,多次关闭 channel 会导致运行时错误,因此要确保只在合适的地方关闭一次。
  4. 资源泄漏:在使用 Context 时,如果没有及时调用取消函数,可能会导致 goroutine 无法正确停止,从而造成资源泄漏。特别是在处理长时间运行的任务时,要确保 Context 能被正确管理。

通过深入理解和合理运用 Go 的基本原语和扩展原语,可以编写出高效、安全的并发程序。在实际项目中,要根据具体的需求和场景选择合适的原语,并注意避免常见的问题,以充分发挥 Go 语言并发编程的优势。