MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kotlin中的协程与异步编程模式

2021-12-132.9k 阅读

Kotlin 协程基础

协程是什么

在传统的编程模型中,线程是操作系统调度的最小单位,多线程编程可以实现并发执行任务。然而,线程的创建和销毁开销较大,并且线程之间的同步和通信容易引发死锁等问题。协程(Coroutine)作为一种轻量级的线程替代方案,在 Kotlin 中得到了很好的支持。

协程本质上是一种用户态的轻量级线程,它可以在一个线程内暂停和恢复执行,而不需要像线程那样进行上下文切换的高昂开销。协程通过挂起函数(Suspend Function)来暂停执行,并在适当的时候恢复,这使得异步编程更加简洁和高效。

挂起函数

挂起函数是 Kotlin 协程的核心概念之一。挂起函数是一种特殊的函数,它可以暂停其执行,并将控制权交回给调用者,同时保存函数的执行状态。当满足特定条件时,挂起函数可以从暂停的地方恢复执行。

挂起函数需要使用 suspend 关键字进行修饰。例如:

suspend fun delay(time: Long) {
    // 模拟耗时操作
    Thread.sleep(time)
}

在上述代码中,delay 函数是一个挂起函数,它通过 Thread.sleep 模拟了一个耗时操作。注意,这里使用 Thread.sleep 只是为了演示,实际在协程中更推荐使用 kotlinx.coroutines.delay

协程的启动方式

Kotlin 提供了多种启动协程的方式,最常用的是 launchasync

launch

launch 用于启动一个新的协程,并返回一个 Job 对象,该对象可用于控制协程的生命周期。launch 不会返回任何结果,主要用于执行一些不需要返回值的异步任务。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        repeat(3) {
            delay(1000)
            println("Job is running: $it")
        }
    }
    // 等待协程执行完毕
    job.join()
    println("Main function is done")
}

在上述代码中,launch 启动了一个协程,协程内部通过 repeat 循环打印信息,每次循环之间通过 delay 暂停 1 秒。main 函数通过 job.join() 等待协程执行完毕后再继续执行。

async

async 同样用于启动一个新的协程,但它返回一个 Deferred 对象,DeferredJob 的子类型,并且可以通过 await() 方法获取协程的执行结果。async 适用于需要异步计算并获取结果的场景。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val deferred = async {
        delay(2000)
        42
    }
    val result = deferred.await()
    println("The result is: $result")
}

在这段代码中,async 启动的协程模拟了一个 2 秒的耗时操作,并返回值 42。main 函数通过 deferred.await() 获取协程的执行结果并打印。

协程上下文与调度器

协程上下文

协程上下文(CoroutineContext)是一个包含协程相关信息的集合,它包括协程的调度器(Dispatcher)、协程的名称、协程的异常处理器等。每个协程都有一个与之关联的上下文。

可以通过 coroutineContext 属性在协程内部访问其上下文。例如:

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch {
        println("Coroutine context: ${coroutineContext}")
    }
}

运行上述代码,可以看到打印出的协程上下文信息,其中包含了默认的调度器等信息。

调度器

调度器(Dispatcher)决定了协程在哪个线程或线程池上执行。Kotlin 提供了几种预定义的调度器:

Dispatchers.Default

Dispatchers.Default 是一个共享的后台调度器,适用于 CPU 密集型任务。它使用一个固定大小的线程池,默认线程数与 CPU 核心数相关。

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch(Dispatchers.Default) {
        println("Running on Default dispatcher: ${Thread.currentThread().name}")
    }
}

上述代码中,协程在 Dispatchers.Default 调度器下执行,打印出执行该协程的线程名称。

Dispatchers.IO

Dispatchers.IO 用于执行 I/O 密集型任务,如文件操作、网络请求等。它使用一个较大的线程池,以适应 I/O 操作可能的阻塞。

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch(Dispatchers.IO) {
        println("Running on IO dispatcher: ${Thread.currentThread().name}")
    }
}

这里协程在 Dispatchers.IO 调度器下执行,适合处理 I/O 相关的异步任务。

Dispatchers.Main

Dispatchers.Main 用于在 Android 应用的主线程上执行协程,主要用于更新 UI 等操作。在桌面应用中,也有类似的主线程调度器。

import kotlinx.coroutines.*
import javax.swing.SwingUtilities

fun main() = runBlocking {
    launch {
        SwingUtilities.invokeLater {
            println("Running on Swing EDT: ${Thread.currentThread().name}")
        }
    }
}

在 Java Swing 应用中,通过 SwingUtilities.invokeLater 类似的方式在主线程执行任务,在 Kotlin 中对于 Android 应用,可以在 Dispatchers.Main 调度器下更新 UI。

自定义调度器

除了使用预定义的调度器,还可以自定义调度器。例如,可以创建一个基于特定线程池的调度器:

import kotlinx.coroutines.*
import java.util.concurrent.Executors

val customDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()

fun main() = runBlocking {
    launch(customDispatcher) {
        println("Running on custom dispatcher: ${Thread.currentThread().name}")
    }
    // 确保协程执行完毕后关闭线程池
    customDispatcher.close()
}

在上述代码中,通过 Executors.newFixedThreadPool 创建了一个固定大小为 5 的线程池,并将其转换为协程调度器 customDispatcher。使用自定义调度器时,需要注意在适当的时候关闭线程池,以避免资源泄漏。

异步编程模式与协程的结合

顺序执行异步任务

在实际开发中,经常会遇到需要顺序执行多个异步任务的情况。例如,先进行网络请求获取数据,然后根据获取的数据进行本地存储。使用协程可以很方便地实现这种顺序执行。

import kotlinx.coroutines.*

suspend fun fetchData(): String {
    delay(1000)
    return "Fetched data"
}

suspend fun saveData(data: String) {
    delay(1000)
    println("Data saved: $data")
}

fun main() = runBlocking {
    val data = fetchData()
    saveData(data)
    println("All tasks completed")
}

在上述代码中,fetchData 模拟了一个网络请求,saveData 模拟了本地存储操作。通过协程的挂起函数特性,fetchData 执行完毕并返回数据后,saveData 才会执行,实现了异步任务的顺序执行。

并发执行异步任务

有时候需要同时执行多个异步任务,然后等待所有任务完成后再进行下一步操作。可以使用 asyncawaitAll 来实现。

import kotlinx.coroutines.*

suspend fun task1(): Int {
    delay(1000)
    return 10
}

suspend fun task2(): Int {
    delay(1500)
    return 20
}

fun main() = runBlocking {
    val deferred1 = async { task1() }
    val deferred2 = async { task2() }
    val results = awaitAll(deferred1, deferred2)
    println("Results: ${results[0]}, ${results[1]}")
}

这里 task1task2 是两个异步任务,通过 async 同时启动。awaitAll 等待所有 Deferred 对象完成,并返回包含所有任务结果的列表。

异步任务的超时处理

在异步编程中,设置任务超时是很重要的。Kotlin 协程提供了 withTimeoutwithTimeoutOrNull 函数来实现超时处理。

import kotlinx.coroutines.*

suspend fun longRunningTask() {
    delay(2000)
    println("Task completed")
}

fun main() = runBlocking {
    try {
        withTimeout(1500) {
            longRunningTask()
        }
    } catch (e: TimeoutCancellationException) {
        println("Task timed out")
    }
}

在上述代码中,withTimeout 设定了一个 1500 毫秒的超时时间。如果 longRunningTask 执行时间超过这个时间,就会抛出 TimeoutCancellationException 异常,从而捕获并处理超时情况。withTimeoutOrNullwithTimeout 类似,但如果超时不会抛出异常,而是返回 null

协程的异常处理

未捕获异常

在协程执行过程中,如果发生未捕获的异常,默认情况下会导致整个协程的取消,并将异常传播到父协程。例如:

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch {
        throw RuntimeException("Uncaught exception")
    }.join()
    println("Main function continues")
}

在上述代码中,子协程抛出了一个 RuntimeException,由于没有捕获该异常,子协程会被取消,并且异常会传播到 runBlocking 协程,导致 join() 方法抛出异常,println("Main function continues") 这行代码不会执行。

捕获异常

可以使用 try - catch 块在协程内部捕获异常。

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch {
        try {
            throw RuntimeException("Caught exception")
        } catch (e: RuntimeException) {
            println("Exception caught: $e")
        }
    }.join()
    println("Main function continues")
}

这里在子协程内部使用 try - catch 块捕获了异常,所以异常不会传播到父协程,println("Main function continues") 会正常执行。

全局异常处理

对于一些无法在局部处理的异常,可以设置全局的异常处理器。通过 CoroutineExceptionHandler 来实现。

import kotlinx.coroutines.*

val exceptionHandler = CoroutineExceptionHandler { _, exception ->
    println("Caught exception: $exception")
}

fun main() = runBlocking(exceptionHandler) {
    launch {
        throw RuntimeException("Global exception")
    }
    println("Main function continues")
}

在上述代码中,通过 CoroutineExceptionHandler 定义了全局异常处理器,并在 runBlocking 中传入该处理器。当协程抛出未捕获的异常时,会由全局异常处理器进行处理,println("Main function continues") 可以正常执行。

协程与其他异步框架的集成

与 RxJava 的集成

RxJava 是一个流行的异步编程框架,Kotlin 协程可以与 RxJava 进行集成。通过 rx2-kotlinx-coroutines 库,可以方便地将 RxJava 的 Observable 转换为协程的 Flow,反之亦然。

import io.reactivex.Observable
import kotlinx.coroutines.*
import kotlinx.coroutines.rx2.await

fun main() = runBlocking {
    val observable = Observable.just(1, 2, 3)
    val result = observable.await()
    println("Result from Observable: $result")
}

在上述代码中,通过 await() 方法将 Observable 转换为协程可等待的结果。

与 Retrofit 的集成

Retrofit 是一个用于 Android 和 Java 的类型安全的 HTTP 客户端。在 Kotlin 中,可以很方便地将 Retrofit 与协程结合使用。首先,在 Retrofit 的接口定义中使用挂起函数。

import retrofit2.Response
import retrofit2.http.GET
import kotlinx.coroutines.Deferred

interface ApiService {
    @GET("data")
    suspend fun getData(): Response<String>
}

然后,在调用 Retrofit 服务时,可以在协程中进行。

import kotlinx.coroutines.*
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory

const val BASE_URL = "https://example.com/"

fun createApi(): ApiService {
    val retrofit = Retrofit.Builder()
       .baseUrl(BASE_URL)
       .addConverterFactory(GsonConverterFactory.create())
       .build()
    return retrofit.create(ApiService::class.java)
}

fun main() = runBlocking {
    val apiService = createApi()
    val response = apiService.getData()
    if (response.isSuccessful) {
        println("Data: ${response.body()}")
    } else {
        println("Error: ${response.code()}")
    }
}

通过这种方式,将 Retrofit 的网络请求与 Kotlin 协程结合,使异步网络编程更加简洁和直观。

协程在 Android 开发中的应用

主线程更新 UI

在 Android 开发中,更新 UI 必须在主线程进行。使用 Kotlin 协程的 Dispatchers.Main 调度器,可以很方便地在异步任务完成后更新 UI。

import android.os.Bundle
import android.widget.TextView
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val textView = findViewById<TextView>(R.id.textView)

        GlobalScope.launch(Dispatchers.IO) {
            val data = fetchData()
            withContext(Dispatchers.Main) {
                textView.text = data
            }
        }
    }

    suspend fun fetchData(): String {
        delay(2000)
        return "Fetched data from network"
    }
}

在上述代码中,GlobalScope.launchDispatchers.IO 调度器下启动一个协程来模拟网络请求,获取数据后通过 withContext(Dispatchers.Main) 切换到主线程更新 TextView 的文本。

异步任务管理

在 Android 应用中,经常有多个异步任务需要管理,例如同时发起多个网络请求,然后等待所有请求完成后再进行下一步操作。使用协程可以轻松实现这种任务管理。

import android.os.Bundle
import android.widget.TextView
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val textView = findViewById<TextView>(R.id.textView)

        GlobalScope.launch {
            val deferred1 = async { fetchData1() }
            val deferred2 = async { fetchData2() }
            val result1 = deferred1.await()
            val result2 = deferred2.await()
            withContext(Dispatchers.Main) {
                textView.text = "Result1: $result1, Result2: $result2"
            }
        }
    }

    suspend fun fetchData1(): String {
        delay(1000)
        return "Data1"
    }

    suspend fun fetchData2(): String {
        delay(1500)
        return "Data2"
    }
}

这里通过 async 同时启动两个异步任务,await 等待任务完成获取结果,最后在主线程更新 UI。

通过以上内容,详细介绍了 Kotlin 中协程与异步编程模式的各个方面,从基础概念到实际应用,希望能帮助开发者更好地理解和运用 Kotlin 协程进行高效的异步编程。