Kotlin协程Flow流操作实战指南
2021-09-285.4k 阅读
Kotlin 协程 Flow 流操作基础
在 Kotlin 中,协程 Flow 提供了一种异步处理数据流的方式,类似于 RxJava 的 Observable,但 Kotlin Flow 是基于 Kotlin 协程构建的,与 Kotlin 语言本身的集成度更高。
Flow 的创建
-
flow 构建器
flow
构建器是创建 Flow 的基本方式。它允许你通过emit
函数来发射数据。例如,创建一个简单的发射从 1 到 5 整数的 Flow:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { for (i in 1..5) { emit(i) } } flow.collect { value -> println("Collected: $value") } }
- 在上述代码中,
flow
构建器定义了一个数据流,emit
函数用于逐个发射数据。collect
是一个挂起函数,用于订阅并处理 Flow 发射的数据。
-
flowOf
flowOf
用于快速创建一个发射固定数量值的 Flow。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf("Apple", "Banana", "Cherry") flow.collect { fruit -> println("Collected fruit: $fruit") } }
- 这里
flowOf
直接创建了一个发射三个字符串的 Flow,然后通过collect
进行数据收集。
-
generateSequence 转 Flow
generateSequence
可以生成一个序列,而我们可以将其转换为 Flow。例如,生成斐波那契数列的 Flow:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val fibonacciFlow = generateSequence(Pair(0, 1)) { (a, b) -> Pair(b, a + b) } .map { it.first } .take(10) .flow fibonacciFlow.collect { value -> println("Fibonacci number: $value") } }
- 首先通过
generateSequence
生成斐波那契数列的序列,然后通过map
、take
等操作对序列进行处理,最后使用.flow
扩展函数将序列转换为 Flow 并进行收集。
Flow 的收集
-
collect
- 如前面示例所示,
collect
是最基本的收集函数。它是一个挂起函数,会订阅 Flow 并处理其发射的每个数据。
import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch fun main() = runBlocking { val flow = flow { for (i in 1..3) { emit(i) delay(1000) } } launch { flow.collect { value -> println("Collecting in launch: $value") } } delay(4000) }
- 在这个例子中,Flow 每隔 1 秒发射一个数据,通过
launch
在一个新的协程中收集数据,主协程等待 4 秒以确保数据收集完成。
- 如前面示例所示,
-
collectIndexed
collectIndexed
不仅能获取发射的数据,还能获取数据的索引。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf("A", "B", "C") flow.collectIndexed { index, value -> println("Index $index: $value") } }
- 这样可以方便地知道每个数据在数据流中的位置。
Flow 操作符基础
过滤操作符
-
filter
filter
用于根据给定的条件过滤 Flow 发射的数据。例如,过滤出偶数:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { for (i in 1..10) { emit(i) } } flow.filter { it % 2 == 0 } .collect { value -> println("Filtered even number: $value") } }
- 这里
filter
操作符只允许偶数通过,然后这些偶数被收集并打印。
-
filterNotNull
filterNotNull
用于过滤掉null
值。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf("Apple", null, "Banana") flow.filterNotNull() .collect { value -> println("Non - null value: $value") } }
- 这样
null
值就不会被收集,只有非null
的字符串会被打印。
映射操作符
-
map
map
操作符将 Flow 发射的每个数据按照指定的函数进行转换。例如,将整数转换为其平方:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { for (i in 1..5) { emit(i) } } flow.map { it * it } .collect { value -> println("Square: $value") } }
- 每个发射的整数都被映射为其平方,然后被收集并打印。
-
mapNotNull
mapNotNull
与map
类似,但如果映射函数返回null
,该数据将被过滤掉。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf(1, 2, 3, 4, 5) flow.mapNotNull { if (it % 2 == 0) it * 2 else null } .collect { value -> println("Mapped non - null value: $value") } }
- 这里只有偶数经过映射后的值(即偶数乘以 2)会被收集,奇数映射后为
null
被过滤掉。
合并操作符
-
zip
zip
操作符将两个 Flow 发射的数据按顺序组合在一起。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow1 = flowOf(1, 2, 3) val flow2 = flowOf("A", "B", "C") flow1.zip(flow2) { num, str -> "$num - $str" } .collect { combined -> println("Combined: $combined") } }
flow1
和flow2
发射的数据通过zip
操作符按顺序组合,然后按照指定的函数进行转换并收集。
-
combine
combine
与zip
类似,但它会在任何一个 Flow 发射新数据时,使用最新的数据进行组合。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow1 = MutableStateFlow(1) val flow2 = MutableStateFlow("A") flow1.combine(flow2) { num, str -> "$num - $str" } .collect { combined -> println("Combined: $combined") } flow1.value = 2 flow2.value = "B" }
- 这里
MutableStateFlow
是一种特殊的 Flow,它可以通过value
属性更新值。每次flow1
或flow2
更新值时,combine
操作符会使用最新的值进行组合并发射新的数据。
Flow 操作符进阶
缓冲操作符
-
buffer
buffer
操作符用于在 Flow 中引入缓冲区。它可以提高数据发射和收集的效率,特别是当发射和收集速度不匹配时。例如:
import kotlinx.coroutines.flow.* import kotlinx.coroutines.delay fun main() = runBlocking { val flow = flow { for (i in 1..10) { emit(i) delay(100) } } flow.buffer() .collect { value -> delay(200) println("Collected: $value") } }
- 在这个例子中,Flow 发射数据的速度比收集速度快,如果没有
buffer
,收集操作会阻塞发射操作。使用buffer
后,发射的数据可以先存放在缓冲区,收集操作可以从缓冲区获取数据,从而提高整体效率。
-
conflate
conflate
操作符用于处理高频率发射的数据。它会丢弃中间值,只保留最新发射的值。例如,模拟一个高频率发射数据的 Flow:
import kotlinx.coroutines.flow.* import kotlinx.coroutines.delay fun main() = runBlocking { val flow = flow { var i = 0 while (true) { emit(i++) delay(10) } } flow.conflate() .collect { value -> delay(100) println("Collected: $value") } }
- 这里 Flow 以每 10 毫秒的频率发射数据,而收集操作每 100 毫秒处理一次数据。使用
conflate
后,中间发射的数据会被丢弃,只处理最新的数据,避免了数据积压。
转换操作符
-
flatMapConcat
flatMapConcat
操作符将 Flow 发射的每个数据转换为另一个 Flow,并按顺序连接这些 Flow。例如,假设有一个发射列表的 Flow,我们想将每个列表转换为一个发射列表元素的 Flow:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf(listOf(1, 2), listOf(3, 4)) flow.flatMapConcat { subList -> flow { for (num in subList) { emit(num) } } } .collect { value -> println("Collected: $value") } }
- 这里
flatMapConcat
将每个列表转换为一个发射列表元素的 Flow,并按顺序连接这些 Flow,最终收集到所有的列表元素。
-
flatMapMerge
flatMapMerge
与flatMapConcat
类似,但它会并发地合并转换后的 Flow。例如:
import kotlinx.coroutines.flow.* import kotlinx.coroutines.delay fun main() = runBlocking { val flow = flowOf(1, 2) flow.flatMapMerge { num -> flow { delay(100 * num) emit("$num processed") } } .collect { value -> println("Collected: $value") } }
- 这里
flatMapMerge
将每个整数转换为一个延迟发射处理结果的 Flow,这些 Flow 会并发地合并,而不是按顺序连接,从而提高了整体处理速度。
折叠操作符
-
reduce
reduce
操作符将 Flow 发射的数据按顺序进行累积操作,最终返回一个单一的值。例如,计算 Flow 发射整数的总和:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf(1, 2, 3, 4) val sum = flow.reduce { acc, value -> acc + value } println("Sum: $sum") }
reduce
从第一个数据开始,使用累积函数(这里是加法)依次处理后续的数据,最终返回总和。
-
fold
fold
与reduce
类似,但它允许指定一个初始值。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flowOf(1, 2, 3, 4) val result = flow.fold(10) { acc, value -> acc + value } println("Result: $result") }
- 这里初始值为 10,
fold
从初始值开始,使用累积函数(加法)处理 Flow 发射的数据,最终返回累积结果。
Flow 与异步操作
异步发射数据
-
异步任务中发射数据
- 我们可以在异步任务中发射 Flow 数据。例如,通过
async
启动一个异步任务,并在任务中发射数据:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { val deferred = async { delay(1000) 42 } emit(deferred.await()) } flow.collect { value -> println("Collected: $value") } }
- 在这个例子中,
async
启动了一个异步任务,延迟 1 秒后返回 42,Flow 等待异步任务完成并发射其结果,然后被收集。
- 我们可以在异步任务中发射 Flow 数据。例如,通过
-
异步操作符与 Flow
- 一些操作符本身就是异步的。例如,
flowOn
操作符可以改变发射数据的上下文。假设我们有一个需要在后台线程发射数据的 Flow:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import java.util.concurrent.Executors fun main() = runBlocking { val executor = Executors.newSingleThreadExecutor() val flow = flow { for (i in 1..5) { emit(i) delay(100) } } flow.flowOn(Dispatchers.fromExecutor(executor)) .collect { value -> println("Collected on main thread: $value") } executor.shutdown() }
- 这里
flowOn
将 Flow 发射数据的上下文切换到了一个由Executors
创建的单线程执行器,而收集操作仍然在主协程(主线程)进行。
- 一些操作符本身就是异步的。例如,
异步收集数据
-
在不同协程中收集
- 可以在不同的协程中收集 Flow 数据。例如,在一个新的协程中收集数据,同时主协程可以继续执行其他任务:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { for (i in 1..5) { emit(i) delay(1000) } } launch { flow.collect { value -> println("Collected in launch: $value") } } println("Main coroutine continues") delay(6000) }
launch
启动了一个新的协程来收集 Flow 数据,主协程打印一条消息后继续执行,最后延迟 6 秒确保收集完成。
-
异步收集操作符
collectLatest
是一个异步收集操作符,它会取消之前正在处理的数据收集,只处理最新的数据。例如,假设有一个快速发射数据的 Flow:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.delay fun main() = runBlocking { val flow = flow { var i = 0 while (true) { emit(i++) delay(100) } } flow.collectLatest { value -> delay(500) println("Collected latest: $value") } }
- 这里 Flow 以每 100 毫秒的频率发射数据,而收集操作每 500 毫秒处理一次数据。
collectLatest
会取消之前未处理完的数据收集,只处理最新发射的数据。
Flow 在实际应用中的场景
网络请求场景
-
多个网络请求合并
- 假设我们有两个网络请求,一个获取用户信息,一个获取用户的订单列表,并且我们想将这两个结果合并。我们可以使用 Flow 的
combine
操作符。首先,定义两个模拟网络请求的函数:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import java.util.concurrent.TimeUnit data class User(val name: String) data class Order(val orderId: Int) suspend fun getUser(): User { delay(TimeUnit.SECONDS.toMillis(1)) return User("John") } suspend fun getOrders(): List<Order> { delay(TimeUnit.SECONDS.toMillis(1)) return listOf(Order(1), Order(2)) } fun main() = runBlocking { val userFlow = flow { emit(getUser()) } val orderFlow = flow { emit(getOrders()) } userFlow.combine(orderFlow) { user, orders -> "User ${user.name} has ${orders.size} orders" } .collect { result -> println(result) } }
- 这里
userFlow
和orderFlow
分别模拟获取用户信息和订单列表的网络请求,通过combine
操作符将两个结果合并并收集。
- 假设我们有两个网络请求,一个获取用户信息,一个获取用户的订单列表,并且我们想将这两个结果合并。我们可以使用 Flow 的
-
分页网络请求
- 对于分页的网络请求,可以使用 Flow 来管理分页逻辑。假设我们有一个获取分页数据的函数:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import java.util.concurrent.TimeUnit data class PageData(val page: Int, val data: List<String>) suspend fun getPageData(page: Int): PageData { delay(TimeUnit.SECONDS.toMillis(1)) return PageData(page, listOf("Item1", "Item2")) } fun main() = runBlocking { (1..3).asFlow() .map { page -> getPageData(page) } .collect { pageData -> println("Page ${pageData.page}: ${pageData.data.joinToString()}") } }
- 这里通过
asFlow
将整数序列转换为 Flow,然后使用map
操作符对每个页码进行网络请求获取分页数据,并收集打印。
本地数据处理场景
-
数据库数据监听
- 假设我们使用 Room 数据库,并且希望在数据库数据变化时进行相应处理。可以使用 Flow 来监听数据库数据的变化。首先,定义一个简单的 Room 数据库实体和 DAO:
import androidx.room.* import kotlinx.coroutines.flow.Flow @Entity data class Note(val id: Int, val text: String) @Dao interface NoteDao { @Query("SELECT * FROM Note") fun getAllNotes(): Flow<List<Note>> } @Database(entities = [Note::class], version = 1) abstract class AppDatabase : RoomDatabase() { abstract fun noteDao(): NoteDao }
- 然后在业务逻辑中收集数据库数据变化:
import android.os.Bundle import androidx.appcompat.app.AppCompatActivity import androidx.lifecycle.lifecycleScope import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) val db = Room.databaseBuilder( applicationContext, AppDatabase::class.java, "notes.db" ).build() lifecycleScope.launch { db.noteDao().getAllNotes().collect { notes -> notes.forEach { note -> println("Note: ${note.text}") } } } } }
- 这里通过 Room 的 Flow 返回值,在
lifecycleScope
中收集数据库数据的变化,并处理新的数据。
-
文件系统数据处理
- 假设我们要处理一个文本文件,逐行读取并进行处理。可以使用 Flow 来实现:
import kotlinx.coroutines.flow.* import java.io.File fun main() = runBlocking { val file = File("example.txt") file.bufferedReader().lineSequence() .flow .map { it.toUpperCase() } .collect { line -> println(line) } }
- 这里先通过
lineSequence
获取文件的行序列,然后转换为 Flow,使用map
操作符将每行转换为大写并收集打印。
Flow 的错误处理
异常处理
-
try - catch 处理
- 可以在
collect
块中使用传统的try - catch
来处理 Flow 发射数据时抛出的异常。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { emit(1) throw RuntimeException("Error occurred") emit(2) } try { flow.collect { value -> println("Collected: $value") } } catch (e: RuntimeException) { println("Caught exception: ${e.message}") } }
- 在这个例子中,Flow 发射完 1 后抛出异常,
try - catch
捕获到异常并打印异常信息。
- 可以在
-
catch 操作符处理
catch
操作符提供了一种更优雅的方式来处理 Flow 中的异常。例如:
import kotlinx.coroutines.flow.* fun main() = runBlocking { val flow = flow { emit(1) throw RuntimeException("Error occurred") emit(2) } flow.catch { e -> println("Caught exception: ${e.message}") } .collect { value -> println("Collected: $value") } }
catch
操作符拦截 Flow 中的异常,在处理完异常后,Flow 不会终止收集,而是继续执行collect
块(但在这个例子中,由于异常后没有数据发射,所以实际上没有更多数据收集)。
恢复处理
-
retry 操作符
retry
操作符用于在 Flow 发射数据抛出异常时进行重试。例如,假设我们有一个模拟网络请求的 Flow,可能会失败:
import kotlinx.coroutines.flow.* import kotlin.random.Random fun main() = runBlocking { val flow = flow { if (Random.nextBoolean()) { throw RuntimeException("Network error") } emit("Success") } flow.retry(3) .collect { value -> println("Collected: $value") } }
- 这里
retry(3)
表示如果 Flow 发射数据时抛出异常,最多重试 3 次,直到成功发射数据或达到最大重试次数。
-
retryWhen 操作符
retryWhen
比retry
更灵活,它允许你根据异常和重试次数来自定义重试逻辑。例如:
import kotlinx.coroutines.flow.* import kotlinx.coroutines.delay import kotlin.random.Random fun main() = runBlocking { val flow = flow { if (Random.nextBoolean()) { throw RuntimeException("Network error") } emit("Success") } flow.retryWhen { cause, attempt -> if (cause is RuntimeException && attempt < 3) { delay(1000 * attempt) true } else { false } } .collect { value -> println("Collected: $value") } }
retryWhen
中的逻辑根据异常类型和重试次数来决定是否重试,并且每次重试前延迟一定时间(延迟时间随着重试次数增加而增加)。
通过以上对 Kotlin 协程 Flow 流操作的详细介绍,从基础创建、收集,到各种操作符的使用,再到异步操作、实际应用场景以及错误处理,希望能帮助开发者全面掌握 Flow 并在项目中灵活运用。