MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go 语言协程(Goroutine)的优先级调度与自定义调度器

2021-12-227.8k 阅读

Go 语言协程基础

在 Go 语言中,协程(Goroutine)是一种轻量级的并发执行单元。与传统线程相比,创建和销毁 Goroutine 的开销极小,这使得 Go 能够轻松处理大量并发任务。

Goroutine 的创建与执行

通过 go 关键字可以非常简单地创建一个新的 Goroutine。以下是一个简单的示例:

package main

import (
    "fmt"
    "time"
)

func printNumbers() {
    for i := 1; i <= 5; i++ {
        fmt.Println("Number:", i)
        time.Sleep(100 * time.Millisecond)
    }
}

func printLetters() {
    for i := 'a'; i <= 'e'; i++ {
        fmt.Println("Letter:", string(i))
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    go printNumbers()
    go printLetters()

    time.Sleep(1 * time.Second)
    fmt.Println("Main function exiting")
}

在上述代码中,printNumbersprintLetters 函数分别在两个不同的 Goroutine 中执行。main 函数创建了这两个 Goroutine 后,会继续向下执行,最后通过 time.Sleep 等待 Goroutine 执行完毕。

调度器概述

Go 语言有一个内置的调度器,负责管理和调度 Goroutine。这个调度器采用 M:N 调度模型,即多个 Goroutine 映射到多个操作系统线程上。Go 调度器的核心组件包括 G(Goroutine)、M(操作系统线程)和 P(处理器)。P 管理着一组可运行的 G,M 从 P 的队列中获取 G 并执行。

优先级调度的需求

在许多实际应用场景中,我们希望对 Goroutine 进行优先级调度。例如,在一个网络服务器中,处理关键业务逻辑的请求可能需要比普通日志记录任务更高的优先级。

传统调度的不足

Go 语言默认的调度器是公平调度,它并不区分 Goroutine 的优先级。所有可运行的 Goroutine 都有平等的机会被调度执行。这在一些情况下可能无法满足应用的需求,例如:

  1. 资源竞争问题:当系统资源有限时,低优先级的任务可能长时间占用资源,导致高优先级任务无法及时执行。
  2. 实时性需求:对于一些对响应时间要求极高的任务,如实时监控数据处理,需要优先执行以保证数据的及时性。

实现优先级调度的方法

基于队列的优先级调度

一种常见的实现优先级调度的方法是使用多个队列,每个队列对应一个优先级。调度器首先从高优先级队列中获取 Goroutine 执行,只有当高优先级队列为空时,才会从较低优先级队列中获取。

以下是一个简单的基于队列的优先级调度的示例代码:

package main

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type Task struct {
    priority int
    f        func()
}

type PriorityScheduler struct {
    queues []*list.List
    mu     sync.Mutex
}

func NewPriorityScheduler(numPriorities int) *PriorityScheduler {
    queues := make([]*list.List, numPriorities)
    for i := range queues {
        queues[i] = list.New()
    }
    return &PriorityScheduler{
        queues: queues,
    }
}

func (ps *PriorityScheduler) AddTask(priority int, task func()) {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    if priority < 0 || priority >= len(ps.queues) {
        return
    }
    ps.queues[priority].PushBack(Task{priority, task})
}

func (ps *PriorityScheduler) Run() {
    for {
        for i := range ps.queues {
            ps.mu.Lock()
            element := ps.queues[i].Front()
            if element != nil {
                task := element.Value.(Task)
                ps.queues[i].Remove(element)
                ps.mu.Unlock()
                go task.f()
            } else {
                ps.mu.Unlock()
            }
        }
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    scheduler := NewPriorityScheduler(3)

    scheduler.AddTask(2, func() {
        fmt.Println("High priority task")
    })
    scheduler.AddTask(1, func() {
        fmt.Println("Medium priority task")
    })
    scheduler.AddTask(0, func() {
        fmt.Println("Low priority task")
    })

    go scheduler.Run()

    time.Sleep(500 * time.Millisecond)
    fmt.Println("Main function exiting")
}

在上述代码中,PriorityScheduler 结构体管理着多个任务队列,AddTask 方法用于将任务添加到相应优先级的队列中,Run 方法则不断从高优先级队列开始获取任务并执行。

基于信号量的优先级调度

另一种实现优先级调度的思路是使用信号量。高优先级的 Goroutine 可以获取更多的信号量,从而有更多机会被调度执行。

以下是一个简单的基于信号量的优先级调度示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

type PrioritySemaphore struct {
    semaphores []*sync.Semaphore
    mu         sync.Mutex
}

func NewPrioritySemaphore(numPriorities int, initialValues []int) *PrioritySemaphore {
    semaphores := make([]*sync.Semaphore, numPriorities)
    for i := range semaphores {
        semaphores[i] = sync.NewSemaphore(int64(initialValues[i]))
    }
    return &PrioritySemaphore{
        semaphores: semaphores,
    }
}

func (ps *PrioritySemaphore) Execute(priority int, task func()) {
    ps.mu.Lock()
    if priority < 0 || priority >= len(ps.semaphores) {
        ps.mu.Unlock()
        return
    }
    sem := ps.semaphores[priority]
    ps.mu.Unlock()

    sem.Acquire(1)
    go func() {
        defer sem.Release(1)
        task()
    }()
}

func main() {
    semaphore := NewPrioritySemaphore(3, []int{3, 2, 1})

    semaphore.Execute(2, func() {
        fmt.Println("High priority task")
    })
    semaphore.Execute(1, func() {
        fmt.Println("Medium priority task")
    })
    semaphore.Execute(0, func() {
        fmt.Println("Low priority task")
    })

    time.Sleep(500 * time.Millisecond)
    fmt.Println("Main function exiting")
}

在这个示例中,PrioritySemaphore 结构体管理着多个信号量,Execute 方法根据任务的优先级获取相应的信号量,只有获取到信号量的任务才能被执行。

自定义调度器

除了实现优先级调度,Go 语言还允许我们自定义调度器,以满足特定的应用需求。

自定义调度器的设计要点

  1. 任务管理:自定义调度器需要管理一组可运行的 Goroutine,通常可以使用队列、堆等数据结构。
  2. 线程管理:调度器需要与操作系统线程进行交互,将 Goroutine 分配到不同的线程上执行。
  3. 调度策略:根据应用需求,确定调度 Goroutine 的策略,如先来先服务、优先级调度等。

简单自定义调度器示例

以下是一个简单的自定义调度器示例,它使用一个任务队列,并采用先来先服务的调度策略:

package main

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type CustomTask struct {
    f func()
}

type CustomScheduler struct {
    taskQueue *list.List
    workerNum int
    mu        sync.Mutex
    wg        sync.WaitGroup
}

func NewCustomScheduler(workerNum int) *CustomScheduler {
    return &CustomScheduler{
        taskQueue: list.New(),
        workerNum: workerNum,
    }
}

func (cs *CustomScheduler) AddTask(task func()) {
    cs.mu.Lock()
    cs.taskQueue.PushBack(CustomTask{task})
    cs.mu.Unlock()
}

func (cs *CustomScheduler) Start() {
    for i := 0; i < cs.workerNum; i++ {
        cs.wg.Add(1)
        go func() {
            defer cs.wg.Done()
            for {
                cs.mu.Lock()
                element := cs.taskQueue.Front()
                if element != nil {
                    task := element.Value.(CustomTask)
                    cs.taskQueue.Remove(element)
                    cs.mu.Unlock()
                    task.f()
                } else {
                    cs.mu.Unlock()
                    time.Sleep(100 * time.Millisecond)
                }
            }
        }()
    }
}

func main() {
    scheduler := NewCustomScheduler(2)

    scheduler.AddTask(func() {
        fmt.Println("Task 1")
    })
    scheduler.AddTask(func() {
        fmt.Println("Task 2")
    })

    scheduler.Start()

    time.Sleep(500 * time.Millisecond)
    fmt.Println("Main function exiting")
}

在上述代码中,CustomScheduler 结构体管理着一个任务队列,并启动了指定数量的工作线程。AddTask 方法用于将任务添加到队列中,Start 方法则启动工作线程,从队列中获取任务并执行。

结合优先级调度的自定义调度器

我们可以将优先级调度的功能集成到自定义调度器中,以实现更强大的调度功能。

设计思路

  1. 任务结构:在任务结构中添加优先级字段。
  2. 任务队列:使用多个任务队列,每个队列对应一个优先级。
  3. 调度策略:调度器优先从高优先级队列中获取任务执行。

代码示例

package main

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type PriorityTask struct {
    priority int
    f        func()
}

type PriorityCustomScheduler struct {
    queues    []*list.List
    workerNum int
    mu        sync.Mutex
    wg        sync.WaitGroup
}

func NewPriorityCustomScheduler(numPriorities, workerNum int) *PriorityCustomScheduler {
    queues := make([]*list.List, numPriorities)
    for i := range queues {
        queues[i] = list.New()
    }
    return &PriorityCustomScheduler{
        queues:    queues,
        workerNum: workerNum,
    }
}

func (pcs *PriorityCustomScheduler) AddTask(priority int, task func()) {
    pcs.mu.Lock()
    if priority < 0 || priority >= len(pcs.queues) {
        pcs.mu.Unlock()
        return
    }
    pcs.queues[priority].PushBack(PriorityTask{priority, task})
    pcs.mu.Unlock()
}

func (pcs *PriorityCustomScheduler) Start() {
    for i := 0; i < pcs.workerNum; i++ {
        pcs.wg.Add(1)
        go func() {
            defer pcs.wg.Done()
            for {
                for j := range pcs.queues {
                    pcs.mu.Lock()
                    element := pcs.queues[j].Front()
                    if element != nil {
                        task := element.Value.(PriorityTask)
                        pcs.queues[j].Remove(element)
                        pcs.mu.Unlock()
                        task.f()
                        break
                    } else {
                        pcs.mu.Unlock()
                    }
                }
                time.Sleep(100 * time.Millisecond)
            }
        }()
    }
}

func main() {
    scheduler := NewPriorityCustomScheduler(3, 2)

    scheduler.AddTask(2, func() {
        fmt.Println("High priority task")
    })
    scheduler.AddTask(1, func() {
        fmt.Println("Medium priority task")
    })
    scheduler.AddTask(0, func() {
        fmt.Println("Low priority task")
    })

    scheduler.Start()

    time.Sleep(500 * time.Millisecond)
    fmt.Println("Main function exiting")
}

在这个示例中,PriorityCustomScheduler 结构体管理着多个优先级队列,并启动了多个工作线程。AddTask 方法根据任务的优先级将任务添加到相应的队列中,Start 方法则按照优先级从高到低的顺序从队列中获取任务并执行。

实际应用场景

  1. 网络服务器:在处理不同类型的网络请求时,可以根据请求的重要性设置优先级。例如,处理用户登录请求的优先级可以高于普通的页面浏览请求。
  2. 实时数据处理:在实时监控系统中,处理实时数据的任务需要优先执行,以保证数据的及时性和准确性。
  3. 批处理任务:在执行批处理任务时,可以将关键任务设置为高优先级,以确保它们在资源有限的情况下优先完成。

注意事项

  1. 资源消耗:实现优先级调度和自定义调度器可能会增加系统的资源消耗,如内存和 CPU 开销。需要在设计和实现过程中进行性能优化。
  2. 死锁风险:在使用同步机制(如互斥锁、信号量等)时,要注意避免死锁的发生。
  3. 兼容性:自定义调度器可能与 Go 语言的标准库和其他第三方库存在兼容性问题,需要进行充分的测试和验证。

通过以上内容,我们详细介绍了 Go 语言中 Goroutine 的优先级调度和自定义调度器的实现方法。这些技术可以帮助我们更好地控制并发任务的执行顺序,提高应用的性能和响应速度。在实际应用中,需要根据具体的业务需求和系统环境选择合适的调度策略和实现方式。