Go基本原语与扩展原语
Go 基本原语
1. 并发原语 - goroutine
Go 语言的并发编程模型基于 goroutine,这是一种轻量级的线程实现。与传统线程相比,goroutine 的创建和销毁成本极低,使得可以在一台机器上轻松创建数以万计的 goroutine。
创建 goroutine:
通过 go
关键字即可创建一个新的 goroutine。例如,以下代码创建了一个简单的 goroutine 来打印字符串:
package main
import (
"fmt"
)
func printMessage() {
fmt.Println("Hello from goroutine")
}
func main() {
go printMessage()
fmt.Println("Main function")
}
在上述代码中,go printMessage()
启动了一个新的 goroutine 来执行 printMessage
函数。主函数并不会等待这个 goroutine 完成就继续执行,因此会先打印 “Main function”。
goroutine 调度: Go 的运行时系统包含一个调度器,负责管理和调度 goroutine。调度器采用 M:N 调度模型,即多个 goroutine 映射到多个操作系统线程上。这种模型允许在少量操作系统线程上高效运行大量 goroutine。例如,假设系统中有 1000 个 goroutine,但只使用了 10 个操作系统线程。调度器会在这些线程上复用 goroutine,根据 goroutine 的状态(运行、就绪、阻塞等)进行切换。
2. 同步原语 - mutex
在并发编程中,多个 goroutine 可能会同时访问共享资源,这可能导致数据竞争和不一致的问题。Mutex(互斥锁)是一种用于保护共享资源的同步原语,它保证在同一时刻只有一个 goroutine 能够访问共享资源。
使用 mutex:
package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final counter value:", counter)
}
在这段代码中,mu
是一个 sync.Mutex
实例。increment
函数在修改 counter
变量之前调用 mu.Lock()
,这样就阻止了其他 goroutine 同时进入临界区(对 counter
的修改部分)。修改完成后,调用 mu.Unlock()
释放锁,允许其他 goroutine 获取锁并访问共享资源。如果没有 mutex
,多个 goroutine 同时修改 counter
会导致数据竞争,最终 counter
的值可能不是预期的 1000。
3. 同步原语 - 读写锁(RWMutex)
有时候,共享资源的读取操作远远多于写入操作。在这种情况下,使用普通的 Mutex
会限制性能,因为即使是读取操作也会独占锁。RWMutex
(读写锁)则解决了这个问题,它允许多个 goroutine 同时进行读取操作,但在写入时会独占锁。
使用 RWMutex:
package main
import (
"fmt"
"sync"
)
var (
data int
rwmu sync.RWMutex
)
func readData(wg *sync.WaitGroup) {
defer wg.Done()
rwmu.RLock()
fmt.Println("Read data:", data)
rwmu.RUnlock()
}
func writeData(wg *sync.WaitGroup, value int) {
defer wg.Done()
rwmu.Lock()
data = value
fmt.Println("Write data:", data)
rwmu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go readData(&wg)
}
for i := 0; i < 2; i++ {
wg.Add(1)
go writeData(&wg, i*10)
}
wg.Wait()
}
在上述代码中,readData
函数使用 rwmu.RLock()
获取读锁,允许多个 goroutine 同时读取 data
。而 writeData
函数使用 rwmu.Lock()
获取写锁,在写入 data
时会阻止其他读或写操作。这样,在读取操作频繁的场景下,读写锁可以提高并发性能。
4. 通信原语 - channel
在 Go 语言中,提倡使用 “通信顺序进程”(CSP)模型进行并发编程,而 channel 是实现 CSP 模型的核心通信原语。Channel 用于在不同 goroutine 之间进行数据传递和同步。
创建和使用 channel:
package main
import (
"fmt"
)
func sendData(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
func receiveData(ch chan int) {
for value := range ch {
fmt.Println("Received:", value)
}
}
func main() {
ch := make(chan int)
go sendData(ch)
receiveData(ch)
}
在这段代码中,首先创建了一个整型的 channel ch
。sendData
函数通过 ch <- i
将数据发送到 channel 中,而 receiveData
函数使用 for... range
循环从 channel 中接收数据。close(ch)
用于关闭 channel,这样接收端的 for... range
循环会在 channel 关闭且所有数据被接收后退出。
无缓冲和有缓冲 channel:
- 无缓冲 channel:创建时不指定容量,如
ch := make(chan int)
。在无缓冲 channel 中,发送操作和接收操作必须同时准备好,否则会发生阻塞。例如,ch <- 1
会阻塞,直到有其他 goroutine 执行<-ch
来接收数据。 - 有缓冲 channel:创建时指定容量,如
ch := make(chan int, 5)
。有缓冲 channel 允许在没有接收者的情况下,先发送一定数量的数据到缓冲区。只有当缓冲区满时,发送操作才会阻塞;只有当缓冲区为空时,接收操作才会阻塞。
Go 扩展原语
1. 条件变量(Cond)
Cond
是基于 Mutex
的更高级同步原语,它允许 goroutine 在满足特定条件时被唤醒。通常用于多个 goroutine 需要等待某个共享资源状态改变的场景。
使用 Cond:
package main
import (
"fmt"
"sync"
"time"
)
var (
ready bool
mu sync.Mutex
cond sync.Cond
)
func worker(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
for!ready {
cond.Wait()
}
fmt.Println("Worker is working")
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(&wg)
}
time.Sleep(2 * time.Second)
mu.Lock()
ready = true
cond.Broadcast()
mu.Unlock()
wg.Wait()
}
在上述代码中,cond
是一个 sync.Cond
实例,它基于 mu
这个 Mutex
。worker
函数在 ready
条件不满足时,通过 cond.Wait()
进入等待状态,并释放 mu
锁。当主函数中设置 ready
为 true
并调用 cond.Broadcast()
时,所有等待在 cond
上的 goroutine 会被唤醒。cond.Wait()
在被唤醒后会重新获取 mu
锁,然后继续执行后续代码。
2. 信号量(Semaphore)
虽然 Go 标准库没有直接提供信号量类型,但可以通过 channel 来模拟实现信号量。信号量用于控制同时访问共享资源的 goroutine 数量。
实现信号量:
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore chan struct{}
func NewSemaphore(capacity int) Semaphore {
return make(Semaphore, capacity)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
func worker(sem Semaphore, id int) {
sem.Acquire()
fmt.Printf("Worker %d started\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Worker %d finished\n", id)
sem.Release()
}
func main() {
sem := NewSemaphore(2)
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(sem, id)
}(i)
}
wg.Wait()
}
在这段代码中,Semaphore
是一个基于 channel 的自定义类型。NewSemaphore
函数创建一个具有指定容量的信号量。Acquire
方法通过向 channel 发送一个空结构体来获取信号量,如果 channel 已满(即达到最大允许的并发数),则会阻塞。Release
方法从 channel 接收一个空结构体,释放信号量,允许其他 goroutine 获取。在 main
函数中,创建了一个容量为 2 的信号量,这意味着最多同时有 2 个 worker
goroutine 可以获取信号量并执行任务。
3. 原子操作
在某些情况下,对共享资源的简单操作(如整数的增减)可以使用原子操作来避免使用锁,从而提高性能。Go 语言的 sync/atomic
包提供了一系列原子操作函数。
使用原子操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var counter int64
func increment(wg *sync.WaitGroup) {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final counter value:", atomic.LoadInt64(&counter))
}
在上述代码中,atomic.AddInt64
函数以原子方式增加 counter
的值。这确保了即使在多个 goroutine 同时执行 increment
函数时,counter
的值也能正确更新,而无需使用 Mutex
。atomic.LoadInt64
函数用于安全地读取 counter
的值。原子操作适用于对简单数据类型的操作,在高并发场景下可以减少锁带来的开销。
4. 同步组(WaitGroup)
WaitGroup
用于等待一组 goroutine 完成。它内部维护一个计数器,通过 Add
方法增加计数,通过 Done
方法减少计数,Wait
方法会阻塞直到计数器归零。
使用 WaitGroup:
package main
import (
"fmt"
"sync"
"time"
)
func task(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("Task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d finished\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go task(&wg, i)
}
fmt.Println("Waiting for tasks to complete...")
wg.Wait()
fmt.Println("All tasks completed")
}
在这段代码中,每个 task
函数在开始时调用 wg.Add(1)
增加 WaitGroup
的计数器,在结束时调用 wg.Done()
减少计数器。主函数通过 wg.Wait()
等待所有 task
函数完成。这样可以确保在所有 goroutine 执行完毕后,主函数才继续执行后续代码。
5. 上下文(Context)
Context
用于在 goroutine 之间传递截止时间、取消信号等信息。它在处理 HTTP 请求、控制并发操作等场景中非常有用。
使用 Context:
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
select {
case <-time.After(5 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task cancelled:", ctx.Err())
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go longRunningTask(ctx)
time.Sleep(4 * time.Second)
}
在上述代码中,context.WithTimeout
创建了一个带有超时的 Context
,ctx
是这个 Context
实例,cancel
是取消函数。longRunningTask
函数通过 select
语句监听 ctx.Done()
通道。如果在任务完成前 Context
被取消(这里是因为超时),ctx.Done()
通道会收到信号,任务会被取消并打印相应信息。defer cancel()
确保在函数结束时无论是否超时都会调用取消函数,释放相关资源。
原语的综合应用
在实际的并发编程中,往往需要综合使用多种原语来实现复杂的功能。下面通过一个简单的示例来展示如何综合使用 goroutine、channel、mutex 和 WaitGroup。
package main
import (
"fmt"
"sync"
)
type Request struct {
ID int
// 其他请求数据
}
type Response struct {
ID int
// 其他响应数据
}
type RequestHandler struct {
requestCh chan Request
responseCh chan Response
mu sync.Mutex
requests map[int]Request
}
func NewRequestHandler() *RequestHandler {
return &RequestHandler{
requestCh: make(chan Request),
responseCh: make(chan Response),
requests: make(map[int]Request),
}
}
func (rh *RequestHandler) HandleRequests(wg *sync.WaitGroup) {
defer wg.Done()
for req := range rh.requestCh {
rh.mu.Lock()
rh.requests[req.ID] = req
rh.mu.Unlock()
// 模拟处理请求
time.Sleep(1 * time.Second)
resp := Response{ID: req.ID}
rh.responseCh <- resp
}
close(rh.responseCh)
}
func main() {
rh := NewRequestHandler()
var wg sync.WaitGroup
wg.Add(1)
go rh.HandleRequests(&wg)
requests := []Request{
{ID: 1},
{ID: 2},
{ID: 3},
}
for _, req := range requests {
rh.requestCh <- req
}
close(rh.requestCh)
for resp := range rh.responseCh {
fmt.Printf("Received response for request %d\n", resp.ID)
}
wg.Wait()
}
在这个示例中,RequestHandler
结构体封装了处理请求的逻辑。requestCh
用于接收请求,responseCh
用于发送响应。mutex
用于保护 requests
映射,因为可能有多个 goroutine 同时访问它。HandleRequests
函数在一个单独的 goroutine 中运行,从 requestCh
接收请求,处理后将响应发送到 responseCh
。主函数创建了 RequestHandler
实例并启动处理 goroutine,然后发送一系列请求,最后从 responseCh
接收响应。通过这种方式,综合运用了 goroutine 实现并发处理、channel 进行通信、mutex 保护共享资源以及 WaitGroup 等待处理 goroutine 完成。
原语使用的注意事项
- 死锁:在使用同步原语(如 mutex、cond 等)时,死锁是一个常见的问题。例如,多个 goroutine 相互等待对方释放锁就会导致死锁。编写代码时要仔细设计锁的获取和释放顺序,避免循环依赖。
- 性能问题:虽然原子操作和无锁数据结构可以提高性能,但过度使用可能会使代码难以理解和维护。在选择使用原子操作还是锁时,要根据具体的场景进行权衡。例如,对于复杂的数据结构操作,锁可能是更合适的选择,尽管它可能会带来一定的性能开销。
- channel 关闭:在使用 channel 时,要注意正确关闭 channel。如果没有正确关闭,可能会导致接收端的
for... range
循环永远阻塞。同时,多次关闭 channel 会导致运行时错误,因此要确保只在合适的地方关闭一次。 - 资源泄漏:在使用
Context
时,如果没有及时调用取消函数,可能会导致 goroutine 无法正确停止,从而造成资源泄漏。特别是在处理长时间运行的任务时,要确保Context
能被正确管理。
通过深入理解和合理运用 Go 的基本原语和扩展原语,可以编写出高效、安全的并发程序。在实际项目中,要根据具体的需求和场景选择合适的原语,并注意避免常见的问题,以充分发挥 Go 语言并发编程的优势。