MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kotlin协程Flow流操作实战指南

2021-09-285.4k 阅读

Kotlin 协程 Flow 流操作基础

在 Kotlin 中,协程 Flow 提供了一种异步处理数据流的方式,类似于 RxJava 的 Observable,但 Kotlin Flow 是基于 Kotlin 协程构建的,与 Kotlin 语言本身的集成度更高。

Flow 的创建

  1. flow 构建器

    • flow 构建器是创建 Flow 的基本方式。它允许你通过 emit 函数来发射数据。例如,创建一个简单的发射从 1 到 5 整数的 Flow:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..5) {
                emit(i)
            }
        }
        flow.collect { value ->
            println("Collected: $value")
        }
    }
    
    • 在上述代码中,flow 构建器定义了一个数据流,emit 函数用于逐个发射数据。collect 是一个挂起函数,用于订阅并处理 Flow 发射的数据。
  2. flowOf

    • flowOf 用于快速创建一个发射固定数量值的 Flow。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf("Apple", "Banana", "Cherry")
        flow.collect { fruit ->
            println("Collected fruit: $fruit")
        }
    }
    
    • 这里 flowOf 直接创建了一个发射三个字符串的 Flow,然后通过 collect 进行数据收集。
  3. generateSequence 转 Flow

    • generateSequence 可以生成一个序列,而我们可以将其转换为 Flow。例如,生成斐波那契数列的 Flow:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val fibonacciFlow = generateSequence(Pair(0, 1)) { (a, b) -> Pair(b, a + b) }
           .map { it.first }
           .take(10)
           .flow
        fibonacciFlow.collect { value ->
            println("Fibonacci number: $value")
        }
    }
    
    • 首先通过 generateSequence 生成斐波那契数列的序列,然后通过 maptake 等操作对序列进行处理,最后使用 .flow 扩展函数将序列转换为 Flow 并进行收集。

Flow 的收集

  1. collect

    • 如前面示例所示,collect 是最基本的收集函数。它是一个挂起函数,会订阅 Flow 并处理其发射的每个数据。
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.launch
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
                delay(1000)
            }
        }
        launch {
            flow.collect { value ->
                println("Collecting in launch: $value")
            }
        }
        delay(4000)
    }
    
    • 在这个例子中,Flow 每隔 1 秒发射一个数据,通过 launch 在一个新的协程中收集数据,主协程等待 4 秒以确保数据收集完成。
  2. collectIndexed

    • collectIndexed 不仅能获取发射的数据,还能获取数据的索引。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf("A", "B", "C")
        flow.collectIndexed { index, value ->
            println("Index $index: $value")
        }
    }
    
    • 这样可以方便地知道每个数据在数据流中的位置。

Flow 操作符基础

过滤操作符

  1. filter

    • filter 用于根据给定的条件过滤 Flow 发射的数据。例如,过滤出偶数:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..10) {
                emit(i)
            }
        }
        flow.filter { it % 2 == 0 }
           .collect { value ->
                println("Filtered even number: $value")
            }
    }
    
    • 这里 filter 操作符只允许偶数通过,然后这些偶数被收集并打印。
  2. filterNotNull

    • filterNotNull 用于过滤掉 null 值。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf("Apple", null, "Banana")
        flow.filterNotNull()
           .collect { value ->
                println("Non - null value: $value")
            }
    }
    
    • 这样 null 值就不会被收集,只有非 null 的字符串会被打印。

映射操作符

  1. map

    • map 操作符将 Flow 发射的每个数据按照指定的函数进行转换。例如,将整数转换为其平方:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..5) {
                emit(i)
            }
        }
        flow.map { it * it }
           .collect { value ->
                println("Square: $value")
            }
    }
    
    • 每个发射的整数都被映射为其平方,然后被收集并打印。
  2. mapNotNull

    • mapNotNullmap 类似,但如果映射函数返回 null,该数据将被过滤掉。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf(1, 2, 3, 4, 5)
        flow.mapNotNull { if (it % 2 == 0) it * 2 else null }
           .collect { value ->
                println("Mapped non - null value: $value")
            }
    }
    
    • 这里只有偶数经过映射后的值(即偶数乘以 2)会被收集,奇数映射后为 null 被过滤掉。

合并操作符

  1. zip

    • zip 操作符将两个 Flow 发射的数据按顺序组合在一起。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow1 = flowOf(1, 2, 3)
        val flow2 = flowOf("A", "B", "C")
        flow1.zip(flow2) { num, str -> "$num - $str" }
           .collect { combined ->
                println("Combined: $combined")
            }
    }
    
    • flow1flow2 发射的数据通过 zip 操作符按顺序组合,然后按照指定的函数进行转换并收集。
  2. combine

    • combinezip 类似,但它会在任何一个 Flow 发射新数据时,使用最新的数据进行组合。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow1 = MutableStateFlow(1)
        val flow2 = MutableStateFlow("A")
        flow1.combine(flow2) { num, str -> "$num - $str" }
           .collect { combined ->
                println("Combined: $combined")
            }
        flow1.value = 2
        flow2.value = "B"
    }
    
    • 这里 MutableStateFlow 是一种特殊的 Flow,它可以通过 value 属性更新值。每次 flow1flow2 更新值时,combine 操作符会使用最新的值进行组合并发射新的数据。

Flow 操作符进阶

缓冲操作符

  1. buffer

    • buffer 操作符用于在 Flow 中引入缓冲区。它可以提高数据发射和收集的效率,特别是当发射和收集速度不匹配时。例如:
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.delay
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..10) {
                emit(i)
                delay(100)
            }
        }
        flow.buffer()
           .collect { value ->
                delay(200)
                println("Collected: $value")
            }
    }
    
    • 在这个例子中,Flow 发射数据的速度比收集速度快,如果没有 buffer,收集操作会阻塞发射操作。使用 buffer 后,发射的数据可以先存放在缓冲区,收集操作可以从缓冲区获取数据,从而提高整体效率。
  2. conflate

    • conflate 操作符用于处理高频率发射的数据。它会丢弃中间值,只保留最新发射的值。例如,模拟一个高频率发射数据的 Flow:
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.delay
    
    fun main() = runBlocking {
        val flow = flow {
            var i = 0
            while (true) {
                emit(i++)
                delay(10)
            }
        }
        flow.conflate()
           .collect { value ->
                delay(100)
                println("Collected: $value")
            }
    }
    
    • 这里 Flow 以每 10 毫秒的频率发射数据,而收集操作每 100 毫秒处理一次数据。使用 conflate 后,中间发射的数据会被丢弃,只处理最新的数据,避免了数据积压。

转换操作符

  1. flatMapConcat

    • flatMapConcat 操作符将 Flow 发射的每个数据转换为另一个 Flow,并按顺序连接这些 Flow。例如,假设有一个发射列表的 Flow,我们想将每个列表转换为一个发射列表元素的 Flow:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf(listOf(1, 2), listOf(3, 4))
        flow.flatMapConcat { subList ->
            flow {
                for (num in subList) {
                    emit(num)
                }
            }
        }
           .collect { value ->
                println("Collected: $value")
            }
    }
    
    • 这里 flatMapConcat 将每个列表转换为一个发射列表元素的 Flow,并按顺序连接这些 Flow,最终收集到所有的列表元素。
  2. flatMapMerge

    • flatMapMergeflatMapConcat 类似,但它会并发地合并转换后的 Flow。例如:
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.delay
    
    fun main() = runBlocking {
        val flow = flowOf(1, 2)
        flow.flatMapMerge { num ->
            flow {
                delay(100 * num)
                emit("$num processed")
            }
        }
           .collect { value ->
                println("Collected: $value")
            }
    }
    
    • 这里 flatMapMerge 将每个整数转换为一个延迟发射处理结果的 Flow,这些 Flow 会并发地合并,而不是按顺序连接,从而提高了整体处理速度。

折叠操作符

  1. reduce

    • reduce 操作符将 Flow 发射的数据按顺序进行累积操作,最终返回一个单一的值。例如,计算 Flow 发射整数的总和:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf(1, 2, 3, 4)
        val sum = flow.reduce { acc, value -> acc + value }
        println("Sum: $sum")
    }
    
    • reduce 从第一个数据开始,使用累积函数(这里是加法)依次处理后续的数据,最终返回总和。
  2. fold

    • foldreduce 类似,但它允许指定一个初始值。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flowOf(1, 2, 3, 4)
        val result = flow.fold(10) { acc, value -> acc + value }
        println("Result: $result")
    }
    
    • 这里初始值为 10,fold 从初始值开始,使用累积函数(加法)处理 Flow 发射的数据,最终返回累积结果。

Flow 与异步操作

异步发射数据

  1. 异步任务中发射数据

    • 我们可以在异步任务中发射 Flow 数据。例如,通过 async 启动一个异步任务,并在任务中发射数据:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            val deferred = async {
                delay(1000)
                42
            }
            emit(deferred.await())
        }
        flow.collect { value ->
            println("Collected: $value")
        }
    }
    
    • 在这个例子中,async 启动了一个异步任务,延迟 1 秒后返回 42,Flow 等待异步任务完成并发射其结果,然后被收集。
  2. 异步操作符与 Flow

    • 一些操作符本身就是异步的。例如,flowOn 操作符可以改变发射数据的上下文。假设我们有一个需要在后台线程发射数据的 Flow:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import java.util.concurrent.Executors
    
    fun main() = runBlocking {
        val executor = Executors.newSingleThreadExecutor()
        val flow = flow {
            for (i in 1..5) {
                emit(i)
                delay(100)
            }
        }
        flow.flowOn(Dispatchers.fromExecutor(executor))
           .collect { value ->
                println("Collected on main thread: $value")
            }
        executor.shutdown()
    }
    
    • 这里 flowOn 将 Flow 发射数据的上下文切换到了一个由 Executors 创建的单线程执行器,而收集操作仍然在主协程(主线程)进行。

异步收集数据

  1. 在不同协程中收集

    • 可以在不同的协程中收集 Flow 数据。例如,在一个新的协程中收集数据,同时主协程可以继续执行其他任务:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            for (i in 1..5) {
                emit(i)
                delay(1000)
            }
        }
        launch {
            flow.collect { value ->
                println("Collected in launch: $value")
            }
        }
        println("Main coroutine continues")
        delay(6000)
    }
    
    • launch 启动了一个新的协程来收集 Flow 数据,主协程打印一条消息后继续执行,最后延迟 6 秒确保收集完成。
  2. 异步收集操作符

    • collectLatest 是一个异步收集操作符,它会取消之前正在处理的数据收集,只处理最新的数据。例如,假设有一个快速发射数据的 Flow:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.delay
    
    fun main() = runBlocking {
        val flow = flow {
            var i = 0
            while (true) {
                emit(i++)
                delay(100)
            }
        }
        flow.collectLatest { value ->
            delay(500)
            println("Collected latest: $value")
        }
    }
    
    • 这里 Flow 以每 100 毫秒的频率发射数据,而收集操作每 500 毫秒处理一次数据。collectLatest 会取消之前未处理完的数据收集,只处理最新发射的数据。

Flow 在实际应用中的场景

网络请求场景

  1. 多个网络请求合并

    • 假设我们有两个网络请求,一个获取用户信息,一个获取用户的订单列表,并且我们想将这两个结果合并。我们可以使用 Flow 的 combine 操作符。首先,定义两个模拟网络请求的函数:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import java.util.concurrent.TimeUnit
    
    data class User(val name: String)
    data class Order(val orderId: Int)
    
    suspend fun getUser(): User {
        delay(TimeUnit.SECONDS.toMillis(1))
        return User("John")
    }
    
    suspend fun getOrders(): List<Order> {
        delay(TimeUnit.SECONDS.toMillis(1))
        return listOf(Order(1), Order(2))
    }
    
    fun main() = runBlocking {
        val userFlow = flow { emit(getUser()) }
        val orderFlow = flow { emit(getOrders()) }
        userFlow.combine(orderFlow) { user, orders ->
            "User ${user.name} has ${orders.size} orders"
        }
           .collect { result ->
                println(result)
            }
    }
    
    • 这里 userFloworderFlow 分别模拟获取用户信息和订单列表的网络请求,通过 combine 操作符将两个结果合并并收集。
  2. 分页网络请求

    • 对于分页的网络请求,可以使用 Flow 来管理分页逻辑。假设我们有一个获取分页数据的函数:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import java.util.concurrent.TimeUnit
    
    data class PageData(val page: Int, val data: List<String>)
    
    suspend fun getPageData(page: Int): PageData {
        delay(TimeUnit.SECONDS.toMillis(1))
        return PageData(page, listOf("Item1", "Item2"))
    }
    
    fun main() = runBlocking {
        (1..3).asFlow()
           .map { page -> getPageData(page) }
           .collect { pageData ->
                println("Page ${pageData.page}: ${pageData.data.joinToString()}")
            }
    }
    
    • 这里通过 asFlow 将整数序列转换为 Flow,然后使用 map 操作符对每个页码进行网络请求获取分页数据,并收集打印。

本地数据处理场景

  1. 数据库数据监听

    • 假设我们使用 Room 数据库,并且希望在数据库数据变化时进行相应处理。可以使用 Flow 来监听数据库数据的变化。首先,定义一个简单的 Room 数据库实体和 DAO:
    import androidx.room.*
    import kotlinx.coroutines.flow.Flow
    
    @Entity
    data class Note(val id: Int, val text: String)
    
    @Dao
    interface NoteDao {
        @Query("SELECT * FROM Note")
        fun getAllNotes(): Flow<List<Note>>
    }
    
    @Database(entities = [Note::class], version = 1)
    abstract class AppDatabase : RoomDatabase() {
        abstract fun noteDao(): NoteDao
    }
    
    • 然后在业务逻辑中收集数据库数据变化:
    import android.os.Bundle
    import androidx.appcompat.app.AppCompatActivity
    import androidx.lifecycle.lifecycleScope
    import kotlinx.coroutines.flow.collect
    import kotlinx.coroutines.launch
    
    class MainActivity : AppCompatActivity() {
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            val db = Room.databaseBuilder(
                applicationContext,
                AppDatabase::class.java,
                "notes.db"
            ).build()
            lifecycleScope.launch {
                db.noteDao().getAllNotes().collect { notes ->
                    notes.forEach { note ->
                        println("Note: ${note.text}")
                    }
                }
            }
        }
    }
    
    • 这里通过 Room 的 Flow 返回值,在 lifecycleScope 中收集数据库数据的变化,并处理新的数据。
  2. 文件系统数据处理

    • 假设我们要处理一个文本文件,逐行读取并进行处理。可以使用 Flow 来实现:
    import kotlinx.coroutines.flow.*
    import java.io.File
    
    fun main() = runBlocking {
        val file = File("example.txt")
        file.bufferedReader().lineSequence()
           .flow
           .map { it.toUpperCase() }
           .collect { line ->
                println(line)
            }
    }
    
    • 这里先通过 lineSequence 获取文件的行序列,然后转换为 Flow,使用 map 操作符将每行转换为大写并收集打印。

Flow 的错误处理

异常处理

  1. try - catch 处理

    • 可以在 collect 块中使用传统的 try - catch 来处理 Flow 发射数据时抛出的异常。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            emit(1)
            throw RuntimeException("Error occurred")
            emit(2)
        }
        try {
            flow.collect { value ->
                println("Collected: $value")
            }
        } catch (e: RuntimeException) {
            println("Caught exception: ${e.message}")
        }
    }
    
    • 在这个例子中,Flow 发射完 1 后抛出异常,try - catch 捕获到异常并打印异常信息。
  2. catch 操作符处理

    • catch 操作符提供了一种更优雅的方式来处理 Flow 中的异常。例如:
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking {
        val flow = flow {
            emit(1)
            throw RuntimeException("Error occurred")
            emit(2)
        }
        flow.catch { e ->
            println("Caught exception: ${e.message}")
        }
           .collect { value ->
                println("Collected: $value")
            }
    }
    
    • catch 操作符拦截 Flow 中的异常,在处理完异常后,Flow 不会终止收集,而是继续执行 collect 块(但在这个例子中,由于异常后没有数据发射,所以实际上没有更多数据收集)。

恢复处理

  1. retry 操作符

    • retry 操作符用于在 Flow 发射数据抛出异常时进行重试。例如,假设我们有一个模拟网络请求的 Flow,可能会失败:
    import kotlinx.coroutines.flow.*
    import kotlin.random.Random
    
    fun main() = runBlocking {
        val flow = flow {
            if (Random.nextBoolean()) {
                throw RuntimeException("Network error")
            }
            emit("Success")
        }
        flow.retry(3)
           .collect { value ->
                println("Collected: $value")
            }
    }
    
    • 这里 retry(3) 表示如果 Flow 发射数据时抛出异常,最多重试 3 次,直到成功发射数据或达到最大重试次数。
  2. retryWhen 操作符

    • retryWhenretry 更灵活,它允许你根据异常和重试次数来自定义重试逻辑。例如:
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.delay
    import kotlin.random.Random
    
    fun main() = runBlocking {
        val flow = flow {
            if (Random.nextBoolean()) {
                throw RuntimeException("Network error")
            }
            emit("Success")
        }
        flow.retryWhen { cause, attempt ->
            if (cause is RuntimeException && attempt < 3) {
                delay(1000 * attempt)
                true
            } else {
                false
            }
        }
           .collect { value ->
                println("Collected: $value")
            }
    }
    
    • retryWhen 中的逻辑根据异常类型和重试次数来决定是否重试,并且每次重试前延迟一定时间(延迟时间随着重试次数增加而增加)。

通过以上对 Kotlin 协程 Flow 流操作的详细介绍,从基础创建、收集,到各种操作符的使用,再到异步操作、实际应用场景以及错误处理,希望能帮助开发者全面掌握 Flow 并在项目中灵活运用。