MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Flutter异步操作与Stream:实现实时数据更新

2022-04-091.4k 阅读

Flutter 异步操作基础

在 Flutter 开发中,异步操作是处理耗时任务的关键手段,避免阻塞主线程从而保证用户界面的流畅性。

1. Future 与异步函数

在 Dart 语言(Flutter 基于 Dart)中,Future 是表示异步操作结果的类。一个异步函数会返回一个 Future 对象。例如,假设有一个模拟网络请求的异步函数:

Future<String> fetchData() async {
  await Future.delayed(Duration(seconds: 2));
  return '模拟的数据';
}

这里 fetchData 函数被声明为 async,意味着它是一个异步函数。await 关键字用于暂停函数的执行,直到 Future 完成。在这个例子中,Future.delayed 模拟了一个 2 秒的延迟,之后返回一个字符串。

要使用这个异步函数,可以这样写:

void main() async {
  String data = await fetchData();
  print(data);
}

main 函数中,同样使用 async 声明,这样才能在函数内部使用 awaitawait 等待 fetchData 完成并获取返回的数据,然后打印出来。

2. Future 的链式调用

有时候,一个异步操作的结果需要作为另一个异步操作的输入。这时可以使用 Future 的链式调用。比如,有一个函数用于解析获取到的数据,另一个函数用于展示解析后的数据:

Future<String> parseData(String data) async {
  await Future.delayed(Duration(seconds: 1));
  return '解析后的数据: $data';
}

Future<void> displayData(String parsedData) async {
  await Future.delayed(Duration(seconds: 1));
  print(parsedData);
}

可以通过链式调用将这些操作连接起来:

void main() {
  fetchData()
    .then(parseData)
    .then(displayData)
    .catchError((error) {
      print('发生错误: $error');
    });
}

在这个链式调用中,fetchData 执行完成后,其结果作为 parseData 的输入,parseData 的结果又作为 displayData 的输入。如果任何一步出现错误,catchError 会捕获并处理错误。

Stream:实现实时数据更新

Stream 是 Dart 中用于处理异步数据流的机制,它可以在一段时间内异步生成多个数据。

1. Stream 的基本概念

Stream 就像是一个序列的事件发射器,它可以随着时间推移发送零个或多个数据。与 Future 不同,Future 只处理单个异步操作的结果,而 Stream 可以处理一系列的异步事件。例如,一个传感器可能会不断地生成数据,这时候就可以用 Stream 来处理这些连续的数据。

2. 创建 Stream

有多种方式可以创建 Stream

使用 StreamController StreamController 是一个强大的工具,用于创建和控制 Stream。例如,创建一个简单的 Stream,每隔一秒发送一个数字:

import 'dart:async';

void main() {
  final controller = StreamController<int>();
  int counter = 0;
  Timer.periodic(Duration(seconds: 1), (timer) {
    controller.add(counter++);
  });

  controller.stream.listen((data) {
    print('接收到数据: $data');
  });
}

在这个例子中,StreamController<int> 创建了一个可以发送 int 类型数据的 StreamTimer.periodic 每隔一秒调用一次回调函数,通过 controller.add 方法向 Stream 中添加数据。controller.stream.listen 用于监听 Stream 发送的数据,并在数据到达时打印出来。

使用 Stream.fromIterable 可以从一个可迭代对象创建 Stream。例如:

void main() {
  final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
  stream.listen((data) {
    print('接收到数据: $data');
  });
}

这里通过 Stream.fromIterable 将一个列表转换为 Stream,然后监听这个 Stream,依次打印出列表中的元素。

3. 监听 Stream

监听 Stream 是获取其发送数据的关键步骤。listen 方法接受多个参数:

void main() {
  final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
  stream.listen(
    (data) {
      print('接收到数据: $data');
    },
    onError: (error) {
      print('发生错误: $error');
    },
    onDone: () {
      print('Stream 完成');
    },
  );
}

onData 回调用于处理 Stream 发送的数据;onError 回调在 Stream 发生错误时被调用;onDone 回调在 Stream 完成发送数据时被调用。

在 Flutter 中应用异步操作与 Stream 实现实时数据更新

在 Flutter 应用中,实时数据更新是很常见的需求,比如实时显示网络数据的变化、传感器数据更新等。

1. 实时显示网络数据

假设要从网络获取实时股票价格数据。首先,需要一个模拟网络请求的函数:

Future<String> fetchStockPrice() async {
  await Future.delayed(Duration(seconds: 2));
  // 这里简单返回一个随机价格
  return (100 + Random().nextInt(50)).toString();
}

然后,使用 Stream 来不断更新价格数据:

import 'dart:async';
import 'package:flutter/material.dart';

void main() => runApp(StockApp());

class StockApp extends StatefulWidget {
  @override
  _StockAppState createState() => _StockAppState();
}

class _StockAppState extends State<StockApp> {
  final StreamController<String> _priceController = StreamController<String>();

  @override
  void initState() {
    super.initState();
    _startFetchingPrices();
  }

  void _startFetchingPrices() {
    Timer.periodic(Duration(seconds: 5), (timer) async {
      String price = await fetchStockPrice();
      _priceController.add(price);
    });
  }

  @override
  void dispose() {
    _priceController.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('实时股票价格'),
        ),
        body: Center(
          child: StreamBuilder<String>(
            stream: _priceController.stream,
            initialData: '加载中...',
            builder: (context, snapshot) {
              return Text('当前股票价格: ${snapshot.data}');
            },
          ),
        ),
      ),
    );
  }
}

在这个例子中,StockApp 是一个状态ful widget。_priceController 是一个 StreamController,用于控制股票价格的 Stream。在 initState 中,通过 Timer.periodic 每隔 5 秒调用 fetchStockPrice 获取新的价格数据,并通过 _priceController.add 发送到 Stream 中。StreamBuilder 用于监听 Stream 的数据变化,并根据新的数据更新 UI。

2. 处理传感器数据

假设要处理设备的加速度传感器数据。Flutter 有一些插件可以获取传感器数据,这里简单模拟一个获取加速度数据的 Stream

import 'dart:async';
import 'package:flutter/material.dart';

void main() => runApp(AccelerometerApp());

class AccelerometerApp extends StatefulWidget {
  @override
  _AccelerometerAppState createState() => _AccelerometerAppState();
}

class _AccelerometerAppState extends State<AccelerometerApp> {
  final StreamController<String> _accelController = StreamController<String>();

  @override
  void initState() {
    super.initState();
    _startSimulatingAccel();
  }

  void _startSimulatingAccel() {
    Timer.periodic(Duration(seconds: 1), (timer) {
      double x = Random().nextDouble();
      double y = Random().nextDouble();
      double z = Random().nextDouble();
      String accelData = 'x: $x, y: $y, z: $z';
      _accelController.add(accelData);
    });
  }

  @override
  void dispose() {
    _accelController.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('加速度传感器数据'),
        ),
        body: Center(
          child: StreamBuilder<String>(
            stream: _accelController.stream,
            initialData: '加载中...',
            builder: (context, snapshot) {
              return Text('加速度数据: ${snapshot.data}');
            },
          ),
        ),
      ),
    );
  }
}

这里通过 Timer.periodic 模拟传感器数据的生成,每隔一秒生成一组随机的加速度数据,并通过 _accelController 发送到 Stream 中。StreamBuilder 监听 Stream 并更新 UI 显示最新的传感器数据。

Stream 的高级特性

1. 转换 Stream

可以对 Stream 进行转换,以满足不同的需求。例如,对之前的股票价格 Stream 进行转换,将价格字符串转换为数值并计算平均值:

import 'dart:async';
import 'package:flutter/material.dart';

void main() => runApp(StockApp());

class StockApp extends StatefulWidget {
  @override
  _StockAppState createState() => _StockAppState();
}

class _StockAppState extends State<StockApp> {
  final StreamController<String> _priceController = StreamController<String>();

  @override
  void initState() {
    super.initState();
    _startFetchingPrices();
  }

  void _startFetchingPrices() {
    Timer.periodic(Duration(seconds: 5), (timer) async {
      String price = await fetchStockPrice();
      _priceController.add(price);
    });
  }

  @override
  void dispose() {
    _priceController.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    final avgStream = _priceController.stream
      .map((priceStr) => double.parse(priceStr))
      .scan((acc, price, _) => acc + price, 0)
      .map((total) => total / 3);

    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('实时股票价格平均'),
        ),
        body: Center(
          child: StreamBuilder<double>(
            stream: avgStream,
            initialData: 0.0,
            builder: (context, snapshot) {
              return Text('最近 3 次平均价格: ${snapshot.data}');
            },
          ),
        ),
      ),
    );
  }
}

Future<String> fetchStockPrice() async {
  await Future.delayed(Duration(seconds: 2));
  return (100 + Random().nextInt(50)).toString();
}

在这个例子中,map 方法将价格字符串转换为 double 类型。scan 方法用于计算累积和,map 方法再次使用以计算平均值。

2. 合并与广播 Stream

合并 Stream 有时候需要合并多个 Stream。例如,有两个 Stream 分别表示股票价格和成交量,需要将它们合并起来显示:

import 'dart:async';
import 'package:flutter/material.dart';

void main() => runApp(StockApp());

class StockApp extends StatefulWidget {
  @override
  _StockAppState createState() => _StockAppState();
}

class _StockAppState extends State<StockApp> {
  final StreamController<String> _priceController = StreamController<String>();
  final StreamController<String> _volumeController = StreamController<String>();

  @override
  void initState() {
    super.initState();
    _startFetchingPrices();
    _startFetchingVolumes();
  }

  void _startFetchingPrices() {
    Timer.periodic(Duration(seconds: 5), (timer) async {
      String price = await fetchStockPrice();
      _priceController.add(price);
    });
  }

  void _startFetchingVolumes() {
    Timer.periodic(Duration(seconds: 5), (timer) async {
      String volume = await fetchStockVolume();
      _volumeController.add(volume);
    });
  }

  @override
  void dispose() {
    _priceController.close();
    _volumeController.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    final combinedStream = StreamZip([
      _priceController.stream,
      _volumeController.stream
    ]).map((dataList) => '价格: ${dataList[0]}, 成交量: ${dataList[1]}');

    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('股票价格与成交量'),
        ),
        body: Center(
          child: StreamBuilder<String>(
            stream: combinedStream,
            initialData: '加载中...',
            builder: (context, snapshot) {
              return Text(snapshot.data);
            },
          ),
        ),
      ),
    );
  }
}

Future<String> fetchStockPrice() async {
  await Future.delayed(Duration(seconds: 2));
  return (100 + Random().nextInt(50)).toString();
}

Future<String> fetchStockVolume() async {
  await Future.delayed(Duration(seconds: 2));
  return (1000 + Random().nextInt(500)).toString();
}

这里使用 StreamZip 将两个 Stream 合并,StreamZip 会等待所有 Stream 都有数据时,将这些数据组合成一个列表。map 方法将列表转换为一个包含价格和成交量信息的字符串。

广播 Stream 默认情况下,Stream 是单订阅的,即只能有一个监听器。如果需要多个监听器,可以使用广播 Stream。例如:

import 'dart:async';

void main() {
  final controller = StreamController<int>.broadcast();
  int counter = 0;
  Timer.periodic(Duration(seconds: 1), (timer) {
    controller.add(counter++);
  });

  controller.stream.listen((data) {
    print('监听器 1 接收到数据: $data');
  });

  controller.stream.listen((data) {
    print('监听器 2 接收到数据: $data');
  });
}

通过 StreamController<int>.broadcast() 创建一个广播 Stream,这样可以有多个监听器同时监听该 Stream

异步操作与 Stream 的错误处理

在异步操作和 Stream 处理中,错误处理至关重要。

1. Future 的错误处理

Future 操作中,可以通过 catchError 来捕获错误。例如:

Future<String> fetchDataWithError() async {
  throw Exception('模拟错误');
  return '数据';
}

void main() {
  fetchDataWithError()
   .then((data) {
      print('获取到数据: $data');
    })
   .catchError((error) {
      print('捕获到错误: $error');
    });
}

这里 fetchDataWithError 函数抛出一个异常,catchError 捕获并处理了这个错误。

2. Stream 的错误处理

Stream 中,可以在 listen 时设置 onError 回调来处理错误。例如:

import 'dart:async';

void main() {
  final controller = StreamController<int>();
  int counter = 0;
  Timer.periodic(Duration(seconds: 1), (timer) {
    if (counter == 3) {
      controller.addError(Exception('模拟 Stream 错误'));
    } else {
      controller.add(counter++);
    }
  });

  controller.stream.listen(
    (data) {
      print('接收到数据: $data');
    },
    onError: (error) {
      print('捕获到 Stream 错误: $error');
    },
  );
}

counter 等于 3 时,通过 controller.addErrorStream 中添加一个错误,onError 回调捕获并处理了这个错误。

总结异步操作与 Stream 在 Flutter 中的应用

在 Flutter 开发中,异步操作和 Stream 为开发者提供了强大的工具来处理耗时任务和实时数据更新。通过 Future 可以优雅地处理单个异步操作,而 Stream 则擅长处理一系列随时间变化的数据。无论是实时网络数据获取、传感器数据处理还是复杂的数据转换与合并,都可以通过这些机制实现。同时,合理的错误处理也是保证应用稳定性的关键。开发者需要根据具体的业务需求,灵活运用这些特性,打造出高效、流畅且稳定的 Flutter 应用。通过深入理解和熟练使用异步操作与 Stream,可以显著提升 Flutter 应用的性能和用户体验。在实际项目中,结合 Flutter 的其他特性,如 StatefulWidget 和 StatelessWidget 的合理使用,以及与各种插件的配合,能够开发出功能丰富、响应迅速的移动应用。