MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kotlin StateFlow与SharedFlow

2021-05-112.2k 阅读

Kotlin StateFlow与SharedFlow

在Kotlin的响应式编程领域,StateFlowSharedFlow 是两个非常重要的概念。它们基于 Kotlin Flow 框架,为处理异步数据流提供了强大且灵活的方式。

StateFlow

StateFlow 是 Kotlin Flow 的一种特殊类型,主要用于表示状态。它始终持有一个值,这个值代表了当前的状态。并且 StateFlow 遵循状态模式,当状态发生变化时,订阅者会收到新的状态值。

  1. 定义与基本使用

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class MyViewModel {
        private val _uiState = MutableStateFlow("Initial State")
        val uiState: StateFlow<String> = _uiState
    
        fun updateState(newState: String) {
            _uiState.value = newState
        }
    }
    
    fun main() = runBlocking {
        val viewModel = MyViewModel()
        viewModel.uiState.collect { state ->
            println("Collected state: $state")
        }
    
        viewModel.updateState("New State 1")
        viewModel.updateState("New State 2")
    }
    

    在上述代码中,我们首先在 MyViewModel 类中定义了一个 MutableStateFlow 实例 _uiState,并初始化为 "Initial State"。同时,我们对外暴露了一个只读的 StateFlow uiState,这样外部代码只能观察状态而不能直接修改。updateState 方法用于更新状态值。在 main 函数中,我们通过 collect 方法订阅了 uiState,每当状态更新时,collect 中的代码块就会被执行。

  2. 特性

    • 始终持有初始值:这是 StateFlow 的一个重要特性。因为它代表状态,所以必须有一个初始值,这样订阅者在订阅时就能立即收到一个有效的状态值,避免了空值的问题。
    • 状态变化通知:当 StateFlow 的值发生变化时,所有活跃的订阅者都会收到通知,并得到最新的状态值。
  3. 线程安全性 StateFlow 是线程安全的。在多线程环境下,多个线程可以安全地更新 StateFlow 的值,并且订阅者会正确地接收到更新。例如:

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class ThreadSafeViewModel {
        private val _uiState = MutableStateFlow("Initial State")
        val uiState: StateFlow<String> = _uiState
    
        fun updateStateOnDifferentThread(newState: String) {
            GlobalScope.launch {
                _uiState.value = newState
            }
        }
    }
    
    fun main() = runBlocking {
        val viewModel = ThreadSafeViewModel()
        viewModel.uiState.collect { state ->
            println("Collected state: $state")
        }
    
        viewModel.updateStateOnDifferentThread("State from different thread")
    }
    

    在这个例子中,updateStateOnDifferentThread 方法在一个新的协程中更新 StateFlow 的值,尽管是在不同的线程,订阅者依然能正确收到更新。

  4. 使用场景

    • UI 状态管理:在 Android 开发中,StateFlow 非常适合管理 UI 状态。例如,一个登录界面的状态可能有 "idle"(空闲)、"loading"(加载中)、"success"(成功)、"error"(错误)等。通过 StateFlow 来管理这些状态,UI 层可以订阅状态变化并相应地更新界面。
    • 业务逻辑状态跟踪:在业务逻辑中,比如一个订单处理流程,状态可能有 "created"(已创建)、"processing"(处理中)、"shipped"(已发货)、"delivered"(已交付)等。StateFlow 可以有效地跟踪这些状态变化,让相关的业务逻辑做出响应。

SharedFlow

SharedFlow 也是 Kotlin Flow 的一种变体,它用于在多个订阅者之间共享数据流。与 StateFlow 不同,SharedFlow 并不需要有初始值。

  1. 定义与基本使用

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class MyPublisher {
        private val _sharedFlow = MutableSharedFlow<String>()
        val sharedFlow: SharedFlow<String> = _sharedFlow
    
        suspend fun emitData(data: String) {
            _sharedFlow.emit(data)
        }
    }
    
    fun main() = runBlocking {
        val publisher = MyPublisher()
        val job1 = launch {
            publisher.sharedFlow.collect { data ->
                println("Job 1 collected: $data")
            }
        }
    
        val job2 = launch {
            publisher.sharedFlow.collect { data ->
                println("Job 2 collected: $data")
            }
        }
    
        repeat(3) { i ->
            publisher.emitData("Data $i")
        }
    
        job1.cancel()
        job2.cancel()
    }
    

    在上述代码中,MyPublisher 类包含一个 MutableSharedFlow _sharedFlow,并对外暴露了只读的 SharedFlow sharedFlowemitData 方法用于发射数据。在 main 函数中,我们创建了两个协程来订阅 sharedFlow,每次发射数据时,两个协程都会收到通知并打印出收集到的数据。

  2. 特性

    • 无初始值SharedFlow 不像 StateFlow 那样需要初始值。这使得它更适合用于事件流等场景,因为事件的发生并不一定需要有一个初始状态。
    • 多订阅者共享:多个订阅者可以同时订阅 SharedFlow,并且都能接收到发射的数据。
    • 热流特性SharedFlow 是一个热流。这意味着即使在没有订阅者的情况下,数据依然可以被发射。当有新的订阅者订阅时,它只会收到从订阅之后发射的数据,而不会收到之前已经发射过的数据。
  3. 配置参数 MutableSharedFlow 有几个重要的配置参数:

    • replay:指定 SharedFlow 应该重放多少个最近发射的数据给新的订阅者。例如,如果 replay = 1,新的订阅者会收到最近一次发射的数据。
    class MyReplayPublisher {
        private val _sharedFlow = MutableSharedFlow<String>(replay = 1)
        val sharedFlow: SharedFlow<String> = _sharedFlow
    
        suspend fun emitData(data: String) {
            _sharedFlow.emit(data)
        }
    }
    
    fun main() = runBlocking {
        val publisher = MyReplayPublisher()
        publisher.emitData("First Data")
    
        val job = launch {
            publisher.sharedFlow.collect { data ->
                println("Collected: $data")
            }
        }
    
        publisher.emitData("Second Data")
        job.cancel()
    }
    

    在这个例子中,由于 replay = 1,新订阅的协程会收到 "Second Data",因为这是最近一次发射的数据。

    • extraBufferCapacity:指定在没有订阅者时可以缓冲的数据数量。如果 extraBufferCapacity = 0,当没有订阅者时发射数据会抛出异常。
  4. 使用场景

    • 事件总线:在应用程序中,SharedFlow 可以作为事件总线来使用。例如,不同的模块可能需要监听某些全局事件,如网络连接变化、用户登录登出等。通过 SharedFlow,这些事件可以被发射并广播给所有感兴趣的订阅者。
    • 多视图共享数据:在多视图的应用中,比如一个主从视图模式的应用,主视图的某些操作可能会产生数据,这些数据需要被多个从视图共享。SharedFlow 可以很好地满足这种需求,将数据发射给所有订阅的从视图。

StateFlow 与 SharedFlow 的对比

  1. 初始值

    • StateFlow 必须有初始值,因为它代表状态,订阅者在订阅时需要立即获取到一个有效的状态值。
    • SharedFlow 不需要初始值,更适合用于表示事件流,事件的发生并不依赖于初始状态。
  2. 数据重放

    • StateFlow 会将当前值重放给新的订阅者,因为它的目的是让订阅者获取最新的状态。
    • SharedFlow 默认不会重放数据给新订阅者,但可以通过设置 replay 参数来指定重放的数量。
  3. 使用场景

    • StateFlow 主要用于状态管理,特别是在 UI 状态管理和业务逻辑状态跟踪方面表现出色。
    • SharedFlow 更适合用于事件流处理、多订阅者共享数据以及不需要初始值的场景。
  4. 线程安全性

    • 两者都是线程安全的。StateFlow 在线程安全更新状态值方面表现良好,以确保所有订阅者能正确收到更新。SharedFlow 在多线程环境下发射数据和订阅者接收数据时也能保证线程安全。
  5. 内存消耗

    • StateFlow 由于始终持有一个状态值,其内存消耗相对稳定。
    • SharedFlow 的内存消耗会根据配置参数(如 replayextraBufferCapacity)以及发射的数据量而有所变化。如果 replay 值较大或 extraBufferCapacity 缓冲的数据较多,可能会占用较多内存。

结合 Kotlin 协程的高级应用

  1. 背压处理 在处理高流量数据流时,背压是一个重要的问题。StateFlowSharedFlow 都可以与 Kotlin 协程的背压策略相结合。例如,我们可以使用 flowOn 操作符来指定数据流在哪个调度器上执行,并通过 collect 时的 flow 构建器设置背压策略。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class HighFlowPublisher {
        private val _sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 10)
        val sharedFlow: SharedFlow<Int> = _sharedFlow
    
        suspend fun emitHighFlow() {
            for (i in 1..100) {
                _sharedFlow.emit(i)
                delay(10)
            }
        }
    }
    
    fun main() = runBlocking {
        val publisher = HighFlowPublisher()
        val job = launch {
            publisher.sharedFlow
              .flowOn(Dispatchers.Default)
              .collect { value ->
                    println("Collected: $value")
                    delay(50)
                }
        }
    
        publisher.emitHighFlow()
        job.cancel()
    }
    

    在这个例子中,SharedFlow 发射数据的速度比订阅者收集数据的速度快。通过 flowOn 操作符,我们将数据流的执行切换到 Dispatchers.Default,并且在 collect 中通过 delay 模拟较慢的处理速度。这样可以有效地处理背压问题,避免数据丢失。

  2. 转换操作符 StateFlowSharedFlow 都可以使用 Kotlin Flow 的各种转换操作符。例如,map 操作符可以将数据流中的值进行转换,filter 操作符可以过滤掉不符合条件的值。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class TransformedPublisher {
        private val _sharedFlow = MutableSharedFlow<Int>()
        val sharedFlow: SharedFlow<String> = _sharedFlow
          .map { "Value: $it" }
          .filter { it.contains("5") }
    
        suspend fun emitData() {
            for (i in 1..10) {
                _sharedFlow.emit(i)
                delay(100)
            }
        }
    }
    
    fun main() = runBlocking {
        val publisher = TransformedPublisher()
        val job = launch {
            publisher.sharedFlow.collect { data ->
                println("Collected: $data")
            }
        }
    
        publisher.emitData()
        job.cancel()
    }
    

    在上述代码中,SharedFlow 发射的 Int 类型数据首先通过 map 操作符转换为 String 类型,并且值的格式为 "Value: <number>"。然后通过 filter 操作符,只保留包含 "5" 的字符串,这样订阅者只会收到符合条件的数据。

  3. 组合数据流 在实际应用中,我们常常需要组合多个数据流。combine 操作符可以将多个 Flow 组合成一个新的 Flow,当其中任何一个 Flow 发射新数据时,新的 Flow 就会根据组合逻辑发射数据。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class DataCombiner {
        private val _flow1 = MutableStateFlow(1)
        private val _flow2 = MutableStateFlow("A")
    
        val combinedFlow: Flow<String> = combine(_flow1, _flow2) { num, str ->
            "$str - $num"
        }
    }
    
    fun main() = runBlocking {
        val combiner = DataCombiner()
        val job = launch {
            combiner.combinedFlow.collect { data ->
                println("Collected: $data")
            }
        }
    
        combiner._flow1.value = 2
        combiner._flow2.value = "B"
        job.cancel()
    }
    

    在这个例子中,_flow1_flow2 分别是一个 StateFlowcombinedFlow 通过 combine 操作符将两个 StateFlow 组合在一起。每当 _flow1_flow2 的值发生变化时,combinedFlow 会根据组合逻辑发射新的数据。

错误处理

  1. 在 StateFlow 和 SharedFlow 中处理错误 StateFlowSharedFlow 都支持错误处理。当发射数据过程中出现异常时,可以通过 catch 操作符来捕获并处理错误。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class ErrorPublisher {
        private val _sharedFlow = MutableSharedFlow<Int>()
        val sharedFlow: SharedFlow<Int> = _sharedFlow
          .catch { e ->
                println("Caught error: $e")
                emit(0)
            }
    
        suspend fun emitError() {
            _sharedFlow.emit(1)
            throw RuntimeException("Simulated error")
            _sharedFlow.emit(2)
        }
    }
    
    fun main() = runBlocking {
        val publisher = ErrorPublisher()
        val job = launch {
            publisher.sharedFlow.collect { data ->
                println("Collected: $data")
            }
        }
    
        try {
            publisher.emitError()
        } catch (e: Exception) {
            println("Outer catch: $e")
        }
    
        job.cancel()
    }
    

    在上述代码中,SharedFlow 使用 catch 操作符捕获发射过程中的异常。当 emitError 方法抛出异常时,catch 块会捕获异常,打印错误信息,并发射一个默认值 0。外部的 try - catch 块也可以捕获异常,但由于 catch 操作符已经处理了异常,这里的 catch 块不会执行。

  2. 不同的错误处理策略 除了简单地捕获并发射默认值,还可以根据不同的异常类型采取不同的处理策略。例如,可以将特定类型的异常转换为不同的默认值,或者进行日志记录等操作。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    class CustomErrorPublisher {
        private val _sharedFlow = MutableSharedFlow<Int>()
        val sharedFlow: SharedFlow<Int> = _sharedFlow
          .catch { e ->
                if (e is ArithmeticException) {
                    println("Caught ArithmeticException: $e")
                    emit(-1)
                } else if (e is RuntimeException) {
                    println("Caught RuntimeException: $e")
                    emit(-2)
                } else {
                    throw e
                }
            }
    
        suspend fun emitCustomError() {
            _sharedFlow.emit(1)
            throw ArithmeticException("Division by zero")
            _sharedFlow.emit(2)
        }
    }
    
    fun main() = runBlocking {
        val publisher = CustomErrorPublisher()
        val job = launch {
            publisher.sharedFlow.collect { data ->
                println("Collected: $data")
            }
        }
    
        try {
            publisher.emitCustomError()
        } catch (e: Exception) {
            println("Outer catch: $e")
        }
    
        job.cancel()
    }
    

    在这个例子中,根据不同的异常类型,catch 块发射不同的默认值。对于 ArithmeticException,发射 -1;对于 RuntimeException,发射 -2;其他类型的异常则重新抛出,让外部的 try - catch 块处理。

最佳实践

  1. 在 Android 开发中的应用

    • 使用 StateFlow 管理 UI 状态:在 Android 视图模型(ViewModel)中,使用 StateFlow 来管理 UI 状态是一种常见的做法。例如,对于一个列表加载的界面,可以定义一个 StateFlow 来表示列表的加载状态(如 "loading""loaded""error")以及列表数据。
    import androidx.lifecycle.ViewModel
    import kotlinx.coroutines.flow.MutableStateFlow
    import kotlinx.coroutines.flow.StateFlow
    import kotlinx.coroutines.launch
    
    class ListViewModel : ViewModel() {
        private val _listState = MutableStateFlow<ListState>(ListState.Idle)
        val listState: StateFlow<ListState> = _listState
    
        fun loadList() {
            viewModelScope.launch {
                _listState.value = ListState.Loading
                try {
                    // 模拟网络请求
                    val data = fetchListData()
                    _listState.value = ListState.Loaded(data)
                } catch (e: Exception) {
                    _listState.value = ListState.Error(e)
                }
            }
        }
    
        private suspend fun fetchListData(): List<String> {
            delay(2000)
            return listOf("Item 1", "Item 2", "Item 3")
        }
    }
    
    sealed class ListState {
        object Idle : ListState()
        object Loading : ListState()
        data class Loaded(val data: List<String>) : ListState()
        data class Error(val exception: Exception) : ListState()
    }
    

    在这个 Android 视图模型中,ListViewModel 使用 StateFlow listState 来管理列表的加载状态。loadList 方法在协程中模拟网络请求,根据请求结果更新 StateFlow 的值。在 UI 层,可以订阅 listState 来根据不同的状态显示加载动画、列表数据或错误信息。

    • 使用 SharedFlow 处理事件:在 Android 应用中,SharedFlow 可以用于处理全局事件,如用户登录登出事件。不同的组件(如视图模型、Fragment 等)可以订阅这些事件并做出相应的反应。
    import kotlinx.coroutines.flow.MutableSharedFlow
    import kotlinx.coroutines.flow.SharedFlow
    
    class EventBus {
        private val _userEventFlow = MutableSharedFlow<UserEvent>()
        val userEventFlow: SharedFlow<UserEvent> = _userEventFlow
    
        suspend fun emitUserEvent(event: UserEvent) {
            _userEventFlow.emit(event)
        }
    }
    
    sealed class UserEvent {
        object Login : UserEvent()
        object Logout : UserEvent()
    }
    
    // 在某个视图模型中订阅事件
    class SomeViewModel : ViewModel() {
        private val eventBus = EventBus()
    
        init {
            viewModelScope.launch {
                eventBus.userEventFlow.collect { event ->
                    when (event) {
                        is UserEvent.Login -> {
                            // 处理登录后的逻辑
                        }
                        is UserEvent.Logout -> {
                            // 处理登出后的逻辑
                        }
                    }
                }
            }
        }
    }
    

    在这个例子中,EventBus 使用 SharedFlow userEventFlow 来发射用户相关的事件。SomeViewModel 订阅了这些事件,并根据不同的事件类型执行相应的逻辑。

  2. 代码结构与组织

    • 封装与抽象:将 StateFlowSharedFlow 的定义和操作封装在合适的类或模块中,以提高代码的可维护性和复用性。例如,在一个大型项目中,可以将与用户相关的 StateFlowSharedFlow 封装在一个 UserModule 中,将与订单相关的封装在 OrderModule 中。
    • 命名规范:使用清晰、有意义的命名来表示 StateFlowSharedFlow。对于 StateFlow,命名可以反映其代表的状态,如 userLoginStateFlow;对于 SharedFlow,命名可以反映其发射的数据类型或事件类型,如 orderUpdateSharedFlow
    • 分层架构:在分层架构中,StateFlowSharedFlow 应该在合适的层中使用。通常,StateFlow 用于表示业务逻辑层的状态,并将状态传递到表示层(如 UI 层);SharedFlow 可以在不同层之间传递事件,以实现跨层的通信。
  3. 性能优化

    • 合理设置参数:对于 SharedFlow,根据实际需求合理设置 replayextraBufferCapacity 参数,避免不必要的内存消耗。如果不需要重放数据,将 replay 设置为 0;如果对缓冲数据量有严格限制,合理设置 extraBufferCapacity
    • 减少不必要的发射:在 StateFlowSharedFlow 发射数据时,确保只有在真正需要时才发射。例如,在 StateFlow 表示的状态没有实际变化时,不要重复发射相同的值,以避免不必要的 UI 更新或其他处理。
    • 优化背压处理:在处理高流量数据流时,仔细选择背压策略,避免数据丢失或过度的资源消耗。例如,如果订阅者处理数据的速度较慢,可以采用 BUFFER 背压策略,并合理设置缓冲区大小。

通过深入理解和正确使用 StateFlowSharedFlow,结合 Kotlin 协程的强大功能,开发者可以构建出高效、灵活且易于维护的响应式应用程序。无论是在 Android 开发还是其他 Kotlin 应用场景中,它们都能为异步数据流处理提供强大的支持。