MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go使用调度器进行并发控制

2023-01-165.6k 阅读

Go语言并发编程基础

并发与并行的概念

在深入探讨Go调度器之前,我们先来明确并发(Concurrency)和并行(Parallelism)的概念。并发是指在同一时间段内,多个任务交替执行,宏观上看起来像是同时进行。它主要通过时间片轮转等方式,在单核CPU上模拟多任务执行的效果。例如,一个CPU核心在短时间内轮流处理多个任务,使得用户感觉这些任务是同时运行的。

而并行则是指在同一时刻,多个任务真正地同时执行,这需要多个CPU核心或多个处理器来支持。比如,一个多核CPU可以让不同的核心同时处理不同的任务。在Go语言中,我们主要利用并发来实现高效的程序设计,通过调度器在多个goroutine之间进行切换,实现并发执行。

goroutine:轻量级线程

Go语言实现并发的核心是goroutine。goroutine是一种轻量级的线程,与操作系统线程(OS线程)相比,它的创建和销毁开销非常小。我们可以轻松地创建成千上万的goroutine,而创建同样数量的OS线程会消耗大量的系统资源,甚至可能导致系统崩溃。

下面是一个简单的创建goroutine的示例代码:

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

在上述代码中,go say("world")创建了一个新的goroutine来执行say("world")函数,而主函数中同时还在执行say("hello")。这两个函数调用看似同时在运行,这就是goroutine带来的并发效果。

通道(Channel):goroutine间通信

当多个goroutine同时运行时,它们之间往往需要进行数据交换和同步。通道(Channel)就是Go语言提供的用于goroutine间通信的机制。通道可以看作是一个管道,数据可以从一端发送,从另一端接收。

下面是一个简单的通道使用示例:

package main

import (
    "fmt"
)

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // 将计算结果发送到通道
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // 从通道接收数据

    fmt.Println(x, y, x+y)
}

在这个例子中,我们创建了一个通道c,并启动两个goroutine分别计算切片 s的前半部分和后半部分的和。然后通过通道将计算结果发送出来,主函数从通道接收结果并进行最终的计算。

Go调度器的设计与实现

M:N调度模型

Go调度器采用的是M:N调度模型,即M个用户级线程(goroutine)映射到N个内核级线程(OS线程)上。与传统的1:1调度模型(一个用户级线程对应一个内核级线程)相比,M:N模型在并发性能上有显著优势。

在1:1模型中,创建大量线程会消耗大量的系统资源,并且线程上下文切换的开销也较大。而M:N模型中,多个goroutine可以复用少量的OS线程,通过调度器在这些goroutine之间进行高效切换,大大减少了资源消耗和上下文切换开销。

G-M-P模型

Go调度器的核心是G-M-P模型,它由三个主要部分组成:G(goroutine)、M(machine,即OS线程)和P(processor)。

  1. G(goroutine):代表一个独立的执行单元,包含了执行的函数、调用栈等信息。每个goroutine都有自己的状态,如运行(running)、就绪(runnable)、阻塞(blocked)等。
  2. M(machine):对应一个OS线程,负责实际执行goroutine的代码。M从P的本地运行队列或者全局运行队列中获取G来执行。
  3. P(processor):处理器,它维护着一个本地运行队列(Local Run Queue),用于存放可运行的goroutine。P还负责管理M与G之间的调度关系,一个P可以绑定到一个M上,使得M在执行G时可以访问到P的本地资源。

下面我们通过一张简单的示意图来理解G-M-P模型: G-M-P模型示意图

在图中,我们可以看到多个P分别绑定到不同的M上,每个P都有自己的本地运行队列,存放着可运行的G。全局运行队列(Global Run Queue)也可以存放G,当某个P的本地运行队列空了时,会尝试从全局运行队列中获取G。

调度器的工作流程

  1. 创建goroutine:当我们使用go关键字创建一个新的goroutine时,这个goroutine会被放入到某个P的本地运行队列中。如果本地运行队列已满,goroutine会被放入全局运行队列。
  2. M获取goroutine:M在启动时会绑定到一个P上,然后不断地从P的本地运行队列中获取goroutine来执行。如果本地运行队列为空,M会尝试从全局运行队列或者其他P的本地运行队列中窃取(work - stealing)一些goroutine来执行。
  3. goroutine的状态转换
    • 运行态(running):当M从P的本地运行队列中获取到一个goroutine并开始执行时,这个goroutine就处于运行态。
    • 就绪态(runnable):当一个goroutine被创建或者等待的资源可用时,它会被放入P的本地运行队列,处于就绪态,等待M来执行。
    • 阻塞态(blocked):当一个goroutine执行到一些阻塞操作,如I/O操作、channel操作等时,它会进入阻塞态,此时M会将这个goroutine从运行态切换到阻塞态,并去执行其他就绪的goroutine。当阻塞的条件解除后,goroutine会重新回到就绪态,等待M再次执行。

Go调度器的并发控制

利用通道进行同步

在并发编程中,同步是非常重要的,它可以避免竞态条件(race condition)等问题。通道不仅可以用于goroutine间的数据传递,还可以用于同步。

例如,我们可以使用一个无缓冲通道来实现简单的同步:

package main

import (
    "fmt"
)

func worker(done chan bool) {
    fmt.Println("working...")
    fmt.Println("done")
    done <- true
}

func main() {
    done := make(chan bool)
    go worker(done)
    <-done
    fmt.Println("main function received done signal")
}

在这个例子中,worker函数在完成工作后,通过done通道发送一个信号。主函数在<-done处阻塞,直到接收到这个信号,从而实现了主函数与worker goroutine之间的同步。

互斥锁(Mutex)与读写锁(RWMutex)

当多个goroutine需要访问共享资源时,为了避免竞态条件,我们可以使用互斥锁(Mutex)。互斥锁保证在同一时刻只有一个goroutine可以访问共享资源。

下面是一个使用互斥锁的示例:

package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter value:", counter)
}

在上述代码中,mu是一个互斥锁,increment函数在修改共享变量counter之前先获取锁,修改完成后释放锁,这样就保证了在同一时刻只有一个goroutine可以修改counter,避免了竞态条件。

读写锁(RWMutex)则是一种更细粒度的锁,它区分了读操作和写操作。允许多个goroutine同时进行读操作,但在写操作时,必须独占资源,以保证数据的一致性。

下面是一个读写锁的使用示例:

package main

import (
    "fmt"
    "sync"
)

var (
    data    = make(map[string]string)
    rwMutex sync.RWMutex
)

func read(key string, wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.RLock()
    value := data[key]
    rwMutex.RUnlock()
    fmt.Printf("Read key %s, value %s\n", key, value)
}

func write(key, value string, wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.Lock()
    data[key] = value
    rwMutex.Unlock()
    fmt.Printf("Write key %s, value %s\n", key, value)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go write("name", "John", &wg)
    go read("name", &wg)
    wg.Wait()
}

在这个例子中,read函数使用读锁(RLock),允许多个读操作并发进行,而write函数使用写锁(Lock),保证写操作的原子性。

条件变量(Cond)

条件变量(Cond)通常与互斥锁一起使用,用于在满足特定条件时唤醒等待的goroutine。例如,当某个资源可用时,唤醒等待该资源的goroutine。

下面是一个条件变量的使用示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    mu    sync.Mutex
    cond  *sync.Cond
    ready bool
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    for!ready {
        cond.Wait()
    }
    fmt.Printf("Worker %d is working\n", id)
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    cond = sync.NewCond(&mu)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    time.Sleep(2 * time.Second)
    mu.Lock()
    ready = true
    cond.Broadcast()
    mu.Unlock()
    wg.Wait()
}

在这个例子中,worker函数在ready条件为false时,通过cond.Wait()进入等待状态,并释放互斥锁。当main函数设置readytrue并调用cond.Broadcast()时,所有等待的worker goroutine会被唤醒,重新获取互斥锁并继续执行。

调度器优化与调优

调度器参数调优

Go调度器有一些参数可以进行调优,以适应不同的应用场景。例如,GOMAXPROCS环境变量可以设置同时运行的最大CPU核心数。默认情况下,GOMAXPROCS的值等于系统的CPU核心数。

我们可以通过以下方式设置GOMAXPROCS

package main

import (
    "fmt"
    "runtime"
)

func main() {
    numCPU := runtime.NumCPU()
    fmt.Println("Default GOMAXPROCS:", runtime.GOMAXPROCS(0))
    runtime.GOMAXPROCS(numCPU - 1)
    fmt.Println("New GOMAXPROCS:", runtime.GOMAXPROCS(0))
}

在上述代码中,我们首先获取系统的CPU核心数,然后通过runtime.GOMAXPROCS函数设置GOMAXPROCS的值为CPU核心数减1。

减少系统调用开销

在goroutine执行过程中,如果频繁进行系统调用,会导致调度器的上下文切换开销增大。例如,I/O操作通常会涉及系统调用。为了减少这种开销,我们可以尽量使用非阻塞的I/O操作,或者将I/O操作集中处理,减少系统调用的频率。

下面是一个使用非阻塞I/O操作的示例:

package main

import (
    "fmt"
    "net"
)

func main() {
    conn, err := net.Dial("tcp", "google.com:80")
    if err != nil {
        fmt.Println("Dial error:", err)
        return
    }
    defer conn.Close()

    conn.SetReadDeadline(time.Now().Add(1 * time.Second))
    buf := make([]byte, 1024)
    n, err := conn.Read(buf)
    if err != nil {
        if ne, ok := err.(net.Error); ok && ne.Timeout() {
            fmt.Println("Read timeout")
        } else {
            fmt.Println("Read error:", err)
        }
        return
    }
    fmt.Println("Read:", string(buf[:n]))
}

在这个例子中,我们通过conn.SetReadDeadline设置了读操作的超时时间,实现了非阻塞的读操作。这样可以避免在I/O操作上长时间阻塞,提高调度器的效率。

避免不必要的锁竞争

锁竞争会降低并发性能,因为在锁竞争时,多个goroutine需要等待锁的释放。为了避免不必要的锁竞争,我们可以采用以下几种方法:

  1. 减少锁的粒度:只在需要保护的关键代码段使用锁,而不是在整个函数中都使用锁。
  2. 使用无锁数据结构:对于一些特定的应用场景,可以使用无锁数据结构,如sync.Map,它在高并发环境下具有更好的性能。

下面是一个使用sync.Map的示例:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Map
    mu.Store("name", "John")
    value, ok := mu.Load("name")
    if ok {
        fmt.Println("Value:", value)
    }
    mu.Delete("name")
}

在这个例子中,sync.Map内部实现了无锁操作,在高并发环境下可以避免锁竞争,提高性能。

实际应用案例

Web服务器中的并发处理

在Web服务器开发中,Go语言的调度器可以高效地处理大量并发请求。例如,使用Go标准库中的net/http包来构建一个简单的Web服务器:

package main

import (
    "fmt"
    "net/http"
)

func handler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello, World!")
}

func main() {
    http.HandleFunc("/", handler)
    fmt.Println("Server is listening on :8080")
    http.ListenAndServe(":8080", nil)
}

在这个简单的Web服务器示例中,每一个HTTP请求都会由一个新的goroutine来处理。Go调度器会在这些goroutine之间进行高效切换,从而可以同时处理大量的并发请求。

分布式计算中的任务调度

在分布式计算场景下,我们可以利用Go调度器来管理和调度任务。例如,假设有一个分布式计算任务,需要在多个节点上并行计算一些数据。我们可以创建多个goroutine,每个goroutine负责一个节点的计算任务,通过通道来传递计算结果。

下面是一个简单的分布式计算示例:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, data []int, resultChan chan int) {
    sum := 0
    for _, v := range data {
        sum += v
    }
    resultChan <- sum
}

func main() {
    data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    numWorkers := 3
    chunkSize := (len(data) + numWorkers - 1) / numWorkers
    resultChan := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        start := i * chunkSize
        end := (i + 1) * chunkSize
        if end > len(data) {
            end = len(data)
        }
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, data[start:end], resultChan)
        }(i)
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    totalSum := 0
    for sum := range resultChan {
        totalSum += sum
    }
    fmt.Println("Total sum:", totalSum)
}

在这个示例中,我们将数据分成多个块,每个块由一个goroutine进行计算。通过通道收集各个goroutine的计算结果,并最终得到总的计算结果。这种方式利用了Go调度器的并发能力,实现了高效的分布式计算。