MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Go函数异步调用实例讲解

2022-12-021.8k 阅读

一、Go 语言并发编程基础

在深入探讨 Go 函数异步调用之前,我们先来回顾一下 Go 语言并发编程的一些基础概念。

1.1 协程(Goroutine)

Go 语言中,协程(Goroutine)是一种轻量级的线程。与操作系统线程相比,创建和销毁 Goroutine 的开销极小。在传统的编程语言中,创建线程的资源开销较大,线程数量过多时会严重影响系统性能。而 Goroutine 在这方面具有极大的优势,一个程序可以轻松创建数以万计的 Goroutine。

创建一个 Goroutine 非常简单,只需要在函数调用前加上 go 关键字。例如:

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

在上述代码中,go say("world") 创建了一个新的 Goroutine 来执行 say("world") 函数,而 say("hello") 则在主 Goroutine 中执行。这里需要注意的是,由于 say("hello") 是在主 Goroutine 中执行,当 say("hello") 执行完毕后,主 Goroutine 结束,整个程序也会随之结束,即使 say("world") 可能还没有执行完。为了避免这种情况,可以使用 time.Sleep 函数让主 Goroutine 等待一段时间,确保 say("world") 有足够的时间执行。

1.2 通道(Channel)

通道(Channel)是 Go 语言中用于在 Goroutine 之间进行通信的机制。它可以被看作是一个管道,数据可以从一端发送进去,从另一端接收出来。通道在 Goroutine 之间传递数据时起到了同步和通信的双重作用。

创建一个通道的语法如下:

ch := make(chan int)

上述代码创建了一个可以传递 int 类型数据的通道 ch。通道有两种主要操作:发送(<-)和接收(<-)。例如:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go func() {
        ch <- 42
    }()
    value := <-ch
    fmt.Println(value)
}

在这个例子中,首先创建了一个通道 ch。然后启动一个匿名 Goroutine,在该 Goroutine 中向通道 ch 发送数据 42。主 Goroutine 从通道 ch 接收数据并打印出来。如果没有 Goroutine 发送数据,接收操作会阻塞,直到有数据发送进来;反之,如果没有 Goroutine 接收数据,发送操作也会阻塞,直到有数据被接收。

二、Go 函数异步调用基础

2.1 异步调用的概念

在 Go 语言中,异步调用通常指的是通过 Goroutine 来并发执行函数,而不阻塞当前的执行流程。当一个函数被异步调用时,调用者会立即返回,而被调用的函数会在新的 Goroutine 中继续执行。这使得程序可以在等待某些耗时操作(如网络请求、文件读取等)完成的同时,继续执行其他任务,从而提高程序的整体效率。

2.2 简单异步调用示例

我们来看一个简单的 Go 函数异步调用示例:

package main

import (
    "fmt"
    "time"
)

func asyncFunction() {
    time.Sleep(2 * time.Second)
    fmt.Println("Async function completed")
}

func main() {
    fmt.Println("Starting main")
    go asyncFunction()
    fmt.Println("Main continues execution")
    time.Sleep(3 * time.Second)
}

在上述代码中,asyncFunction 是一个模拟耗时操作的函数,它会休眠 2 秒。在 main 函数中,通过 go asyncFunction()asyncFunction 进行异步调用。调用后,main 函数会立即继续执行,打印出 Main continues execution。由于 asyncFunction 是在新的 Goroutine 中执行,它的执行不会阻塞 main 函数。最后,通过 time.Sleep(3 * time.Second)main 函数等待 3 秒,确保 asyncFunction 有足够的时间完成并打印出 Async function completed

三、带返回值的异步调用

3.1 使用通道获取异步函数返回值

在实际编程中,我们常常需要获取异步调用函数的返回值。由于 Goroutine 是并发执行的,不能像同步函数调用那样直接获取返回值。这时,通道就派上用场了。下面是一个示例:

package main

import (
    "fmt"
    "time"
)

func asyncFunctionWithReturn() int {
    time.Sleep(2 * time.Second)
    return 42
}

func main() {
    resultCh := make(chan int)
    go func() {
        result := asyncFunctionWithReturn()
        resultCh <- result
    }()
    fmt.Println("Main continues execution")
    result := <-resultCh
    fmt.Printf("Async function result: %d\n", result)
}

在这个例子中,首先创建了一个通道 resultCh 用于接收异步函数的返回值。然后启动一个匿名 Goroutine,在该 Goroutine 中调用 asyncFunctionWithReturn 函数,并将返回值通过通道 resultCh 发送出去。主 Goroutine 在继续执行其他任务(打印 Main continues execution)后,从通道 resultCh 接收返回值并打印。

3.2 封装带返回值的异步调用

为了使代码更加简洁和可复用,我们可以将上述过程封装成一个函数:

package main

import (
    "fmt"
    "time"
)

func asyncFunctionWithReturn() int {
    time.Sleep(2 * time.Second)
    return 42
}

func asyncCall(f func() int) chan int {
    resultCh := make(chan int)
    go func() {
        result := f()
        resultCh <- result
    }()
    return resultCh
}

func main() {
    resultCh := asyncCall(asyncFunctionWithReturn)
    fmt.Println("Main continues execution")
    result := <-resultCh
    fmt.Printf("Async function result: %d\n", result)
}

在这个改进后的代码中,asyncCall 函数接受一个返回 int 类型的函数 f 作为参数。它创建一个通道 resultCh,在新的 Goroutine 中调用 f 并将返回值通过通道发送出去,最后返回这个通道。在 main 函数中,通过调用 asyncCall(asyncFunctionWithReturn) 来异步调用 asyncFunctionWithReturn 函数,并从返回的通道中获取返回值。

四、处理多个异步调用

4.1 多个异步调用无依赖关系

有时候我们需要同时执行多个无依赖关系的异步函数,并获取它们的返回值。假设我们有两个函数 func1func2,分别返回不同的结果:

package main

import (
    "fmt"
    "time"
)

func func1() int {
    time.Sleep(2 * time.Second)
    return 10
}

func func2() int {
    time.Sleep(3 * time.Second)
    return 20
}

func main() {
    result1Ch := make(chan int)
    result2Ch := make(chan int)

    go func() {
        result := func1()
        result1Ch <- result
    }()

    go func() {
        result := func2()
        result2Ch <- result
    }()

    fmt.Println("Main continues execution")

    result1 := <-result1Ch
    result2 := <-result2Ch

    fmt.Printf("Result of func1: %d\n", result1)
    fmt.Printf("Result of func2: %d\n", result2)
    fmt.Printf("Sum of results: %d\n", result1+result2)
}

在上述代码中,分别为 func1func2 创建了两个通道 result1Chresult2Ch。通过两个匿名 Goroutine 分别异步调用 func1func2,并将返回值通过对应的通道发送出去。主 Goroutine 在继续执行其他任务后,从两个通道中依次接收返回值,并进行相应的计算和打印。

4.2 使用 sync.WaitGroup 等待多个异步调用完成

虽然上述方法可以实现多个异步调用并获取返回值,但如果有很多个异步调用,管理多个通道会变得复杂。这时,我们可以使用 sync.WaitGroup 来简化这个过程。sync.WaitGroup 可以用来等待一组 Goroutine 完成。下面是改写后的代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

func func1(wg *sync.WaitGroup, result *int) {
    defer wg.Done()
    time.Sleep(2 * time.Second)
    *result = 10
}

func func2(wg *sync.WaitGroup, result *int) {
    defer wg.Done()
    time.Sleep(3 * time.Second)
    *result = 20
}

func main() {
    var wg sync.WaitGroup
    var result1, result2 int

    wg.Add(2)
    go func1(&wg, &result1)
    go func2(&wg, &result2)

    fmt.Println("Main continues execution")

    wg.Wait()

    fmt.Printf("Result of func1: %d\n", result1)
    fmt.Printf("Result of func2: %d\n", result2)
    fmt.Printf("Sum of results: %d\n", result1+result2)
}

在这个代码中,sync.WaitGroupAdd 方法用于设置需要等待的 Goroutine 数量。每个 Goroutine 在完成任务后调用 wg.Done() 方法通知 WaitGroup。主 Goroutine 通过 wg.Wait() 方法阻塞,直到所有的 Goroutine 都调用了 wg.Done()。通过这种方式,我们可以更方便地管理多个异步调用,而不需要为每个异步调用单独创建通道。

五、错误处理在异步调用中的应用

5.1 异步函数返回错误

在实际开发中,异步函数可能会出现错误。我们需要一种机制来传递这些错误信息。同样可以利用通道来实现:

package main

import (
    "fmt"
    "time"
)

func asyncFunctionWithError() (int, error) {
    time.Sleep(2 * time.Second)
    // 模拟错误情况
    return 0, fmt.Errorf("simulated error")
}

func main() {
    resultCh := make(chan int, 1)
    errorCh := make(chan error, 1)

    go func() {
        result, err := asyncFunctionWithError()
        if err != nil {
            errorCh <- err
        } else {
            resultCh <- result
        }
    }()

    fmt.Println("Main continues execution")

    select {
    case result := <-resultCh:
        fmt.Printf("Async function result: %d\n", result)
    case err := <-errorCh:
        fmt.Printf("Async function error: %v\n", err)
    }
}

在这个例子中,asyncFunctionWithError 函数返回一个结果和一个错误。通过两个通道 resultCherrorCh 分别传递结果和错误信息。在匿名 Goroutine 中,根据是否有错误,将结果或错误发送到对应的通道。主 Goroutine 使用 select 语句来监听这两个通道,哪个通道先接收到数据就执行对应的分支。

5.2 多个异步调用的错误处理

当有多个异步调用时,错误处理会变得更加复杂。假设我们有多个函数,只要其中一个函数出错,整个操作就失败。可以这样处理:

package main

import (
    "fmt"
    "sync"
    "time"
)

func func1(wg *sync.WaitGroup, result *int, errCh chan error) {
    defer wg.Done()
    time.Sleep(2 * time.Second)
    // 模拟错误情况
    if true {
        errCh <- fmt.Errorf("func1 error")
        return
    }
    *result = 10
}

func func2(wg *sync.WaitGroup, result *int, errCh chan error) {
    defer wg.Done()
    time.Sleep(3 * time.Second)
    *result = 20
}

func main() {
    var wg sync.WaitGroup
    var result1, result2 int
    errCh := make(chan error, 1)

    wg.Add(2)
    go func1(&wg, &result1, errCh)
    go func2(&wg, &result2, errCh)

    fmt.Println("Main continues execution")

    go func() {
        wg.Wait()
        close(errCh)
    }()

    for err := range errCh {
        fmt.Printf("Error: %v\n", err)
        return
    }

    fmt.Printf("Result of func1: %d\n", result1)
    fmt.Printf("Result of func2: %d\n", result2)
    fmt.Printf("Sum of results: %d\n", result1+result2)
}

在这个代码中,每个异步函数都有一个 errCh 通道用于传递错误信息。如果某个函数出错,就将错误发送到 errCh 通道。主 Goroutine 通过 for... range 循环从 errCh 通道接收错误信息,如果接收到错误,就打印错误并结束程序。同时,通过一个匿名 Goroutine 在所有 Goroutine 完成后关闭 errCh 通道,以避免 for... range 陷入死循环。

六、异步调用与上下文(Context)

6.1 上下文(Context)的概念

在 Go 语言中,上下文(Context)是一种用于在 Goroutine 之间传递截止时间、取消信号等信息的机制。它在处理异步调用时非常有用,特别是当需要在某些情况下取消异步操作时。上下文可以被传递到不同的 Goroutine 中,从而统一管理这些 Goroutine 的生命周期。

6.2 使用上下文取消异步调用

下面是一个使用上下文取消异步调用的示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Task cancelled")
            return
        default:
            fmt.Println("Task is running")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    go longRunningTask(ctx)

    time.Sleep(5 * time.Second)
}

在上述代码中,首先通过 context.WithTimeout 创建一个带有超时时间(3 秒)的上下文 ctx 和取消函数 cancellongRunningTask 函数接收这个上下文,并在循环中通过 select 语句监听 ctx.Done() 通道。当 ctx.Done() 通道接收到数据时,说明上下文被取消,任务就会停止。在 main 函数中,启动 longRunningTask 异步任务后,等待 5 秒。由于上下文设置了 3 秒的超时时间,longRunningTask 会在 3 秒后收到取消信号并停止执行。

6.3 传递上下文到多个异步调用

当有多个异步调用时,上下文可以在这些调用之间传递,以实现统一的控制。例如:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func task1(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Task1 cancelled")
            return
        default:
            fmt.Println("Task1 is running")
            time.Sleep(1 * time.Second)
        }
    }
}

func task2(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Task2 cancelled")
            return
        default:
            fmt.Println("Task2 is running")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    var wg sync.WaitGroup
    wg.Add(2)

    go task1(ctx, &wg)
    go task2(ctx, &wg)

    wg.Wait()
}

在这个例子中,task1task2 两个异步任务都接收上下文 ctx。当上下文超时或被取消时,两个任务都会收到取消信号并停止执行。通过这种方式,可以方便地管理多个相关异步调用的生命周期。

七、异步调用性能优化

7.1 减少 Goroutine 创建开销

虽然 Goroutine 的创建开销相对较小,但如果在短时间内创建大量的 Goroutine,仍然可能会对性能产生影响。例如,在一个循环中频繁创建 Goroutine 执行简单任务时,可以考虑使用一个 Goroutine 池来复用 Goroutine。下面是一个简单的 Goroutine 池示例:

package main

import (
    "fmt"
    "sync"
)

type Task func()

type Worker struct {
    taskQueue chan Task
    wg        *sync.WaitGroup
}

func NewWorker(taskQueue chan Task, wg *sync.WaitGroup) *Worker {
    return &Worker{
        taskQueue: taskQueue,
        wg:        wg,
    }
}

func (w *Worker) Start() {
    go func() {
        for task := range w.taskQueue {
            task()
            w.wg.Done()
        }
    }()
}

type WorkerPool struct {
    taskQueue chan Task
    numWorkers int
    wg        sync.WaitGroup
}

func NewWorkerPool(numWorkers, capacity int) *WorkerPool {
    taskQueue := make(chan Task, capacity)
    pool := &WorkerPool{
        taskQueue: taskQueue,
        numWorkers: numWorkers,
    }
    for i := 0; i < numWorkers; i++ {
        worker := NewWorker(taskQueue, &pool.wg)
        worker.Start()
    }
    return pool
}

func (p *WorkerPool) Submit(task Task) {
    p.wg.Add(1)
    p.taskQueue <- task
}

func (p *WorkerPool) Wait() {
    close(p.taskQueue)
    p.wg.Wait()
}

func main() {
    pool := NewWorkerPool(3, 10)
    for i := 0; i < 10; i++ {
        task := func() {
            fmt.Println("Task executed")
        }
        pool.Submit(task)
    }
    pool.Wait()
}

在上述代码中,WorkerPool 表示一个 Goroutine 池,它包含一个任务队列 taskQueue 和一定数量的 Worker。每个 Worker 从任务队列中取出任务并执行。通过这种方式,可以减少 Goroutine 的创建和销毁开销,提高性能。

7.2 优化通道操作

通道操作(发送和接收)可能会导致阻塞,从而影响性能。在设计通道时,要合理设置缓冲区大小。如果通道没有缓冲区,发送和接收操作会同步进行,即发送方会阻塞直到有接收方接收数据,接收方会阻塞直到有发送方发送数据。而带有缓冲区的通道可以在缓冲区未满时,发送操作不会阻塞。例如:

package main

import (
    "fmt"
    "time"
)

func main() {
    // 无缓冲区通道
    ch1 := make(chan int)
    go func() {
        fmt.Println("Sending to ch1")
        ch1 <- 1
        fmt.Println("Sent to ch1")
    }()
    time.Sleep(1 * time.Second)
    value1 := <-ch1
    fmt.Printf("Received from ch1: %d\n", value1)

    // 有缓冲区通道
    ch2 := make(chan int, 1)
    go func() {
        fmt.Println("Sending to ch2")
        ch2 <- 2
        fmt.Println("Sent to ch2")
    }()
    time.Sleep(1 * time.Second)
    value2 := <-ch2
    fmt.Printf("Received from ch2: %d\n", value2)
}

在这个例子中,ch1 是无缓冲区通道,发送操作 ch1 <- 1 会阻塞,直到主 Goroutine 接收数据。而 ch2 是有缓冲区通道,发送操作 ch2 <- 2 不会立即阻塞,因为缓冲区有空间。合理设置通道缓冲区大小可以避免不必要的阻塞,提高程序性能。

7.3 避免不必要的同步

在异步编程中,过多的同步操作(如互斥锁、条件变量等)会降低程序的并发性能。要尽量减少对共享资源的竞争,例如可以通过将数据结构设计为线程安全的,或者避免在多个 Goroutine 之间共享数据。如果必须共享数据,要精确控制同步操作的范围,只在必要的代码段进行同步。例如:

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    counter int
    mutex   sync.Mutex
}

func (sc *SafeCounter) Increment() {
    sc.mutex.Lock()
    sc.counter++
    sc.mutex.Unlock()
}

func (sc *SafeCounter) Value() int {
    sc.mutex.Lock()
    value := sc.counter
    sc.mutex.Unlock()
    return value
}

func main() {
    var wg sync.WaitGroup
    sc := SafeCounter{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            sc.Increment()
        }()
    }
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", sc.Value())
}

在这个例子中,SafeCounter 结构体通过互斥锁 mutex 来保证 counter 变量在多 Goroutine 环境下的安全访问。在 IncrementValue 方法中,只在对 counter 进行操作的关键代码段加锁,这样可以尽量减少锁的持有时间,提高并发性能。

通过以上性能优化方法,可以在 Go 函数异步调用时提高程序的整体性能,充分发挥 Go 语言并发编程的优势。