MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

利用 Flutter 的 Stream 进行高效的异步数据加载

2023-07-065.6k 阅读

Flutter 中的异步编程基础

在深入了解 Flutter 的 Stream 之前,我们先来回顾一下异步编程在 Flutter 中的基础概念。Flutter 运行在单线程的 Dart 虚拟机上,但它通过异步编程模型来实现高效的用户界面更新和处理耗时操作,避免阻塞主线程。

异步函数(async/await)

在 Dart 中,asyncawait 是实现异步操作的核心关键字。async 用于标记一个函数为异步函数,这意味着该函数不会阻塞主线程,而是返回一个 Future 对象。await 只能在 async 函数内部使用,它用于暂停当前函数的执行,直到 Future 完成(resolved),并返回 Future 的结果。

以下是一个简单的异步函数示例,模拟网络请求:

Future<String> fetchData() async {
  // 模拟网络延迟
  await Future.delayed(Duration(seconds: 2));
  return 'Data from server';
}

void main() async {
  print('开始执行');
  String result = await fetchData();
  print(result);
  print('结束执行');
}

在上述代码中,fetchData 是一个异步函数,await Future.delayed 模拟了网络请求的延迟。在 main 函数中,await fetchData() 会暂停 main 函数的执行,直到 fetchData 返回结果,在此期间,主线程不会被阻塞,其他 UI 更新等操作仍然可以正常进行。

Future

Future 是 Dart 中表示异步操作结果的类。它代表一个在未来某个时间点会完成的操作,并且可以持有该操作的结果或错误。Future 有几种状态:未完成(uncompleted)、已完成(completed)和已出错(errored)。

我们可以通过 then 方法来处理 Future 的结果,例如:

Future<String> fetchData() async {
  await Future.delayed(Duration(seconds: 2));
  return 'Data from server';
}

void main() {
  print('开始执行');
  fetchData().then((result) {
    print(result);
  }).catchError((error) {
    print('发生错误: $error');
  });
  print('结束执行');
}

这里,fetchData().then 会在 fetchDataFuture 完成时执行,catchError 用于捕获 Future 执行过程中发生的错误。

Stream 简介

虽然 Future 适用于只需要获取一次结果的异步操作,但对于需要在一段时间内多次获取数据的场景,例如实时数据更新(如传感器数据、实时聊天消息等),Stream 则更为合适。

什么是 Stream

Stream 是一个事件序列,可以在一段时间内异步地生成零个或多个数据。它类似于一个可监听的序列,每当有新数据生成时,就会触发一个事件,订阅者(通过 listen 方法)可以收到这些事件并处理数据。

Stream 的类型

  1. 单订阅 Stream(Single-subscription Stream):这种 Stream 只能有一个订阅者。一旦有订阅者订阅,它就开始生成数据。如果尝试再次订阅,会抛出异常。例如,Stream.fromFuture 创建的 Stream 就是单订阅 Stream
  2. 广播 Stream(Broadcast Stream):广播 Stream 允许多个订阅者同时订阅。它会将数据广播给所有订阅者。可以通过 StreamController 创建广播 Stream

创建 Stream

使用 StreamController 创建 Stream

StreamController 是 Dart 中用于创建 Stream 的主要工具。它可以创建单订阅和广播 Stream

  1. 创建单订阅 Stream
void main() {
  final controller = StreamController<int>();
  // 订阅 Stream
  controller.stream.listen((data) {
    print('收到数据: $data');
  });
  // 添加数据到 Stream
  controller.add(1);
  controller.add(2);
  controller.add(3);
  // 关闭 Stream
  controller.close();
}

在上述代码中,StreamController<int> 创建了一个可以生成 int 类型数据的 Streamcontroller.stream.listen 用于订阅该 Stream,当有数据通过 controller.add 添加到 Stream 时,订阅者会收到并打印数据。controller.close() 用于关闭 Stream,表示不会再有新数据添加。

  1. 创建广播 Stream
void main() {
  final controller = StreamController<int>.broadcast();
  // 第一个订阅者
  controller.stream.listen((data) {
    print('订阅者1收到数据: $data');
  });
  // 第二个订阅者
  controller.stream.listen((data) {
    print('订阅者2收到数据: $data');
  });
  // 添加数据到 Stream
  controller.add(1);
  controller.add(2);
  controller.add(3);
  // 关闭 Stream
  controller.close();
}

这里使用 StreamController<int>.broadcast() 创建了一个广播 Stream,允许多个订阅者同时订阅。当数据通过 controller.add 添加时,所有订阅者都会收到数据。

使用 Stream.fromIterable 创建 Stream

Stream.fromIterable 可以从一个可迭代对象(如 List)创建一个 Stream。这个 Stream 会依次发送可迭代对象中的每个元素,然后自动关闭。

void main() {
  final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
  stream.listen((data) {
    print('收到数据: $data');
  });
}

在这个例子中,Stream.fromIterable([1, 2, 3, 4, 5]) 创建了一个 Stream,它会依次将列表中的元素发送给订阅者。

使用 Stream.periodic 创建周期性生成数据的 Stream

Stream.periodic 用于创建一个按固定时间间隔生成数据的 Stream

void main() {
  final stream = Stream.periodic(const Duration(seconds: 1), (count) => count);
  stream.listen((data) {
    print('每秒收到数据: $data');
  });
  // 5 秒后取消订阅
  Future.delayed(const Duration(seconds: 5)).then((_) {
    stream.cancel();
  });
}

在上述代码中,Stream.periodic(const Duration(seconds: 1), (count) => count) 创建了一个每秒生成一个递增整数的 Streamstream.cancel() 用于在 5 秒后取消订阅,停止接收数据。

处理 Stream 数据

使用 listen 方法

listen 是最基本的处理 Stream 数据的方法。它接受一个回调函数,每当有新数据到达时,该回调函数就会被调用。

void main() {
  final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
  stream.listen((data) {
    print('收到数据: $data');
  }, onError: (error) {
    print('发生错误: $error');
  }, onDone: () {
    print('Stream 已完成');
  });
}

在这个例子中,onError 回调用于处理 Stream 生成数据过程中发生的错误,onDone 回调在 Stream 关闭时被调用,表示不会再有新数据。

使用 async* 和 yield

async*yield 关键字可以用来创建异步生成器函数,该函数返回一个 Streamasync* 标记一个函数为异步生成器函数,yield 用于生成数据。

Stream<int> generateNumbers() async* {
  for (int i = 1; i <= 5; i++) {
    // 模拟异步操作
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() {
  generateNumbers().listen((data) {
    print('收到数据: $data');
  });
}

在上述代码中,generateNumbers 是一个异步生成器函数,它每秒生成一个数字并通过 yield 发送给 Stream 的订阅者。

使用 StreamTransformer

StreamTransformer 用于对 Stream 中的数据进行转换。它可以将一个 Stream 转换为另一个 Stream,在转换过程中对数据进行处理。

例如,我们创建一个将 int 类型数据转换为 String 类型并添加前缀的 StreamTransformer

StreamTransformer<int, String> intToStringTransformer() =>
    StreamTransformer<int, String>.fromHandlers(
      handleData: (int data, EventSink<String> sink) {
        sink.add('数字 $data');
      },
    );

void main() {
  final stream = Stream.fromIterable([1, 2, 3]);
  stream.transform(intToStringTransformer()).listen((data) {
    print(data);
  });
}

在这个例子中,intToStringTransformer 是一个 StreamTransformer,它将 int 类型的数据转换为带有前缀的 String 类型数据。stream.transform(intToStringTransformer()) 将原始 Stream 进行转换,新的 Stream 会生成转换后的数据。

在 Flutter 中利用 Stream 进行数据加载

结合 Flutter 组件使用 Stream

在 Flutter 中,我们经常需要将 Stream 与 UI 组件结合使用,以实现实时数据更新。例如,我们可以使用 StreamBuilder 组件。

import 'package:flutter/material.dart';

Stream<int> generateNumbers() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('Stream 示例'),
        ),
        body: Center(
          child: StreamBuilder<int>(
            stream: generateNumbers(),
            builder: (context, snapshot) {
              if (snapshot.hasData) {
                return Text('当前数字: ${snapshot.data}');
              } else if (snapshot.hasError) {
                return Text('发生错误: ${snapshot.error}');
              }
              return CircularProgressIndicator();
            },
          ),
        ),
      ),
    );
  }
}

在上述代码中,StreamBuilder 监听 generateNumbers 生成的 Stream。当 Stream 有数据时(snapshot.hasData),显示当前数字;当发生错误时(snapshot.hasError),显示错误信息;在等待数据时,显示加载指示器(CircularProgressIndicator)。

处理复杂的异步数据加载场景

在实际应用中,我们可能会遇到更复杂的异步数据加载场景,例如需要从多个数据源获取数据并合并处理。

假设我们有两个 Stream,一个用于获取用户信息,另一个用于获取用户的订单列表。我们可以使用 StreamZip 将这两个 Stream 合并。

import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';

Stream<String> fetchUserInfo() async* {
  await Future.delayed(const Duration(seconds: 2));
  yield '用户信息';
}

Stream<List<String>> fetchUserOrders() async* {
  await Future.delayed(const Duration(seconds: 3));
  yield ['订单1', '订单2'];
}

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('合并 Stream 示例'),
        ),
        body: Center(
          child: StreamBuilder<List<dynamic>>(
            stream: Rx.zip2(fetchUserInfo(), fetchUserOrders(), (info, orders) => [info, orders]),
            builder: (context, snapshot) {
              if (snapshot.hasData) {
                final userInfo = snapshot.data![0] as String;
                final userOrders = snapshot.data![1] as List<String>;
                return Column(
                  mainAxisAlignment: MainAxisAlignment.center,
                  children: [
                    Text('用户信息: $userInfo'),
                    SizedBox(height: 16),
                    Text('订单列表: ${userOrders.join(', ')}'),
                  ],
                );
              } else if (snapshot.hasError) {
                return Text('发生错误: ${snapshot.error}');
              }
              return CircularProgressIndicator();
            },
          ),
        ),
      ),
    );
  }
}

在这个例子中,我们使用了 rxdart 库中的 Rx.zip2 方法将 fetchUserInfofetchUserOrders 两个 Stream 合并。StreamBuilder 监听合并后的 Stream,当数据都获取到后,显示用户信息和订单列表。

管理 Stream 的生命周期

在 Flutter 中,正确管理 Stream 的生命周期非常重要,尤其是在组件销毁时,要确保 Stream 被正确关闭,以避免内存泄漏。

例如,在 StatefulWidget 中,我们可以在 dispose 方法中取消 Stream 的订阅。

import 'package:flutter/material.dart';

class MyWidget extends StatefulWidget {
  @override
  _MyWidgetState createState() => _MyWidgetState();
}

class _MyWidgetState extends State<MyWidget> {
  late StreamSubscription<int> _subscription;

  @override
  void initState() {
    super.initState();
    final stream = Stream.periodic(const Duration(seconds: 1), (count) => count);
    _subscription = stream.listen((data) {
      print('收到数据: $data');
    });
  }

  @override
  void dispose() {
    _subscription.cancel();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Container();
  }
}

在上述代码中,_subscriptionStream 的订阅,在 initState 方法中进行订阅。在 dispose 方法中,通过 _subscription.cancel() 取消订阅,确保在组件销毁时,Stream 相关资源被正确释放。

性能优化与注意事项

避免内存泄漏

正如前面提到的,在组件销毁时要正确关闭 Stream 或取消订阅,以避免内存泄漏。特别是在使用单订阅 Stream 时,如果不及时关闭,可能会导致资源无法释放。

控制 Stream 的生成频率

对于周期性生成数据的 Stream(如 Stream.periodic),要合理控制生成频率,避免过于频繁地生成数据导致性能问题。如果数据更新频率过高,可能会消耗过多的资源,影响应用的流畅性。

处理背压(Backpressure)

背压是指当 Stream 生成数据的速度超过订阅者处理数据的速度时产生的问题。在 Dart 中,对于单订阅 Stream,如果不处理背压,当缓冲区满时,新数据可能会被丢弃或导致异常。可以通过 StreamControlleronCancelonPause 等回调来处理背压。

例如,以下是一个简单的处理背压的示例:

void main() {
  final controller = StreamController<int>(
    onListen: () {
      print('开始监听');
    },
    onCancel: () {
      print('取消监听');
    },
    onPause: () {
      print('暂停监听');
    },
    onResume: () {
      print('恢复监听');
    },
  );
  final subscription = controller.stream.listen((data) {
    print('收到数据: $data');
    Future.delayed(const Duration(seconds: 2));
  });
  for (int i = 1; i <= 10; i++) {
    controller.add(i);
  }
  Future.delayed(const Duration(seconds: 5)).then((_) {
    subscription.cancel();
  });
}

在这个例子中,StreamController 的各种回调用于处理与订阅相关的事件。onPauseonResume 可以用于在背压情况下暂停和恢复数据生成。

总结

通过本文,我们深入了解了 Flutter 中 Stream 的概念、创建方式、数据处理方法以及在实际应用中的使用技巧。Stream 为 Flutter 开发者提供了一种高效的异步数据加载和处理机制,特别适用于需要实时更新数据的场景。在使用 Stream 时,要注意正确管理其生命周期,避免内存泄漏,同时合理处理背压等性能问题,以确保应用的流畅性和稳定性。无论是简单的数据加载还是复杂的多数据源合并,Stream 都能为我们提供强大的支持,帮助我们构建更加高效、灵活的 Flutter 应用。