Flutter异步操作与Stream:实现实时数据更新
Flutter 异步操作基础
在 Flutter 开发中,异步操作是处理耗时任务的关键手段,避免阻塞主线程从而保证用户界面的流畅性。
1. Future 与异步函数
在 Dart 语言(Flutter 基于 Dart)中,Future
是表示异步操作结果的类。一个异步函数会返回一个 Future
对象。例如,假设有一个模拟网络请求的异步函数:
Future<String> fetchData() async {
await Future.delayed(Duration(seconds: 2));
return '模拟的数据';
}
这里 fetchData
函数被声明为 async
,意味着它是一个异步函数。await
关键字用于暂停函数的执行,直到 Future
完成。在这个例子中,Future.delayed
模拟了一个 2 秒的延迟,之后返回一个字符串。
要使用这个异步函数,可以这样写:
void main() async {
String data = await fetchData();
print(data);
}
在 main
函数中,同样使用 async
声明,这样才能在函数内部使用 await
。await
等待 fetchData
完成并获取返回的数据,然后打印出来。
2. Future 的链式调用
有时候,一个异步操作的结果需要作为另一个异步操作的输入。这时可以使用 Future
的链式调用。比如,有一个函数用于解析获取到的数据,另一个函数用于展示解析后的数据:
Future<String> parseData(String data) async {
await Future.delayed(Duration(seconds: 1));
return '解析后的数据: $data';
}
Future<void> displayData(String parsedData) async {
await Future.delayed(Duration(seconds: 1));
print(parsedData);
}
可以通过链式调用将这些操作连接起来:
void main() {
fetchData()
.then(parseData)
.then(displayData)
.catchError((error) {
print('发生错误: $error');
});
}
在这个链式调用中,fetchData
执行完成后,其结果作为 parseData
的输入,parseData
的结果又作为 displayData
的输入。如果任何一步出现错误,catchError
会捕获并处理错误。
Stream:实现实时数据更新
Stream
是 Dart 中用于处理异步数据流的机制,它可以在一段时间内异步生成多个数据。
1. Stream 的基本概念
Stream
就像是一个序列的事件发射器,它可以随着时间推移发送零个或多个数据。与 Future
不同,Future
只处理单个异步操作的结果,而 Stream
可以处理一系列的异步事件。例如,一个传感器可能会不断地生成数据,这时候就可以用 Stream
来处理这些连续的数据。
2. 创建 Stream
有多种方式可以创建 Stream
。
使用 StreamController
StreamController
是一个强大的工具,用于创建和控制 Stream
。例如,创建一个简单的 Stream
,每隔一秒发送一个数字:
import 'dart:async';
void main() {
final controller = StreamController<int>();
int counter = 0;
Timer.periodic(Duration(seconds: 1), (timer) {
controller.add(counter++);
});
controller.stream.listen((data) {
print('接收到数据: $data');
});
}
在这个例子中,StreamController<int>
创建了一个可以发送 int
类型数据的 Stream
。Timer.periodic
每隔一秒调用一次回调函数,通过 controller.add
方法向 Stream
中添加数据。controller.stream.listen
用于监听 Stream
发送的数据,并在数据到达时打印出来。
使用 Stream.fromIterable
可以从一个可迭代对象创建 Stream
。例如:
void main() {
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
stream.listen((data) {
print('接收到数据: $data');
});
}
这里通过 Stream.fromIterable
将一个列表转换为 Stream
,然后监听这个 Stream
,依次打印出列表中的元素。
3. 监听 Stream
监听 Stream
是获取其发送数据的关键步骤。listen
方法接受多个参数:
void main() {
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
stream.listen(
(data) {
print('接收到数据: $data');
},
onError: (error) {
print('发生错误: $error');
},
onDone: () {
print('Stream 完成');
},
);
}
onData
回调用于处理 Stream
发送的数据;onError
回调在 Stream
发生错误时被调用;onDone
回调在 Stream
完成发送数据时被调用。
在 Flutter 中应用异步操作与 Stream 实现实时数据更新
在 Flutter 应用中,实时数据更新是很常见的需求,比如实时显示网络数据的变化、传感器数据更新等。
1. 实时显示网络数据
假设要从网络获取实时股票价格数据。首先,需要一个模拟网络请求的函数:
Future<String> fetchStockPrice() async {
await Future.delayed(Duration(seconds: 2));
// 这里简单返回一个随机价格
return (100 + Random().nextInt(50)).toString();
}
然后,使用 Stream
来不断更新价格数据:
import 'dart:async';
import 'package:flutter/material.dart';
void main() => runApp(StockApp());
class StockApp extends StatefulWidget {
@override
_StockAppState createState() => _StockAppState();
}
class _StockAppState extends State<StockApp> {
final StreamController<String> _priceController = StreamController<String>();
@override
void initState() {
super.initState();
_startFetchingPrices();
}
void _startFetchingPrices() {
Timer.periodic(Duration(seconds: 5), (timer) async {
String price = await fetchStockPrice();
_priceController.add(price);
});
}
@override
void dispose() {
_priceController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('实时股票价格'),
),
body: Center(
child: StreamBuilder<String>(
stream: _priceController.stream,
initialData: '加载中...',
builder: (context, snapshot) {
return Text('当前股票价格: ${snapshot.data}');
},
),
),
),
);
}
}
在这个例子中,StockApp
是一个状态ful widget。_priceController
是一个 StreamController
,用于控制股票价格的 Stream
。在 initState
中,通过 Timer.periodic
每隔 5 秒调用 fetchStockPrice
获取新的价格数据,并通过 _priceController.add
发送到 Stream
中。StreamBuilder
用于监听 Stream
的数据变化,并根据新的数据更新 UI。
2. 处理传感器数据
假设要处理设备的加速度传感器数据。Flutter 有一些插件可以获取传感器数据,这里简单模拟一个获取加速度数据的 Stream
:
import 'dart:async';
import 'package:flutter/material.dart';
void main() => runApp(AccelerometerApp());
class AccelerometerApp extends StatefulWidget {
@override
_AccelerometerAppState createState() => _AccelerometerAppState();
}
class _AccelerometerAppState extends State<AccelerometerApp> {
final StreamController<String> _accelController = StreamController<String>();
@override
void initState() {
super.initState();
_startSimulatingAccel();
}
void _startSimulatingAccel() {
Timer.periodic(Duration(seconds: 1), (timer) {
double x = Random().nextDouble();
double y = Random().nextDouble();
double z = Random().nextDouble();
String accelData = 'x: $x, y: $y, z: $z';
_accelController.add(accelData);
});
}
@override
void dispose() {
_accelController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('加速度传感器数据'),
),
body: Center(
child: StreamBuilder<String>(
stream: _accelController.stream,
initialData: '加载中...',
builder: (context, snapshot) {
return Text('加速度数据: ${snapshot.data}');
},
),
),
),
);
}
}
这里通过 Timer.periodic
模拟传感器数据的生成,每隔一秒生成一组随机的加速度数据,并通过 _accelController
发送到 Stream
中。StreamBuilder
监听 Stream
并更新 UI 显示最新的传感器数据。
Stream 的高级特性
1. 转换 Stream
可以对 Stream
进行转换,以满足不同的需求。例如,对之前的股票价格 Stream
进行转换,将价格字符串转换为数值并计算平均值:
import 'dart:async';
import 'package:flutter/material.dart';
void main() => runApp(StockApp());
class StockApp extends StatefulWidget {
@override
_StockAppState createState() => _StockAppState();
}
class _StockAppState extends State<StockApp> {
final StreamController<String> _priceController = StreamController<String>();
@override
void initState() {
super.initState();
_startFetchingPrices();
}
void _startFetchingPrices() {
Timer.periodic(Duration(seconds: 5), (timer) async {
String price = await fetchStockPrice();
_priceController.add(price);
});
}
@override
void dispose() {
_priceController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
final avgStream = _priceController.stream
.map((priceStr) => double.parse(priceStr))
.scan((acc, price, _) => acc + price, 0)
.map((total) => total / 3);
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('实时股票价格平均'),
),
body: Center(
child: StreamBuilder<double>(
stream: avgStream,
initialData: 0.0,
builder: (context, snapshot) {
return Text('最近 3 次平均价格: ${snapshot.data}');
},
),
),
),
);
}
}
Future<String> fetchStockPrice() async {
await Future.delayed(Duration(seconds: 2));
return (100 + Random().nextInt(50)).toString();
}
在这个例子中,map
方法将价格字符串转换为 double
类型。scan
方法用于计算累积和,map
方法再次使用以计算平均值。
2. 合并与广播 Stream
合并 Stream
有时候需要合并多个 Stream
。例如,有两个 Stream
分别表示股票价格和成交量,需要将它们合并起来显示:
import 'dart:async';
import 'package:flutter/material.dart';
void main() => runApp(StockApp());
class StockApp extends StatefulWidget {
@override
_StockAppState createState() => _StockAppState();
}
class _StockAppState extends State<StockApp> {
final StreamController<String> _priceController = StreamController<String>();
final StreamController<String> _volumeController = StreamController<String>();
@override
void initState() {
super.initState();
_startFetchingPrices();
_startFetchingVolumes();
}
void _startFetchingPrices() {
Timer.periodic(Duration(seconds: 5), (timer) async {
String price = await fetchStockPrice();
_priceController.add(price);
});
}
void _startFetchingVolumes() {
Timer.periodic(Duration(seconds: 5), (timer) async {
String volume = await fetchStockVolume();
_volumeController.add(volume);
});
}
@override
void dispose() {
_priceController.close();
_volumeController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
final combinedStream = StreamZip([
_priceController.stream,
_volumeController.stream
]).map((dataList) => '价格: ${dataList[0]}, 成交量: ${dataList[1]}');
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('股票价格与成交量'),
),
body: Center(
child: StreamBuilder<String>(
stream: combinedStream,
initialData: '加载中...',
builder: (context, snapshot) {
return Text(snapshot.data);
},
),
),
),
);
}
}
Future<String> fetchStockPrice() async {
await Future.delayed(Duration(seconds: 2));
return (100 + Random().nextInt(50)).toString();
}
Future<String> fetchStockVolume() async {
await Future.delayed(Duration(seconds: 2));
return (1000 + Random().nextInt(500)).toString();
}
这里使用 StreamZip
将两个 Stream
合并,StreamZip
会等待所有 Stream
都有数据时,将这些数据组合成一个列表。map
方法将列表转换为一个包含价格和成交量信息的字符串。
广播 Stream
默认情况下,Stream
是单订阅的,即只能有一个监听器。如果需要多个监听器,可以使用广播 Stream
。例如:
import 'dart:async';
void main() {
final controller = StreamController<int>.broadcast();
int counter = 0;
Timer.periodic(Duration(seconds: 1), (timer) {
controller.add(counter++);
});
controller.stream.listen((data) {
print('监听器 1 接收到数据: $data');
});
controller.stream.listen((data) {
print('监听器 2 接收到数据: $data');
});
}
通过 StreamController<int>.broadcast()
创建一个广播 Stream
,这样可以有多个监听器同时监听该 Stream
。
异步操作与 Stream 的错误处理
在异步操作和 Stream
处理中,错误处理至关重要。
1. Future 的错误处理
在 Future
操作中,可以通过 catchError
来捕获错误。例如:
Future<String> fetchDataWithError() async {
throw Exception('模拟错误');
return '数据';
}
void main() {
fetchDataWithError()
.then((data) {
print('获取到数据: $data');
})
.catchError((error) {
print('捕获到错误: $error');
});
}
这里 fetchDataWithError
函数抛出一个异常,catchError
捕获并处理了这个错误。
2. Stream 的错误处理
在 Stream
中,可以在 listen
时设置 onError
回调来处理错误。例如:
import 'dart:async';
void main() {
final controller = StreamController<int>();
int counter = 0;
Timer.periodic(Duration(seconds: 1), (timer) {
if (counter == 3) {
controller.addError(Exception('模拟 Stream 错误'));
} else {
controller.add(counter++);
}
});
controller.stream.listen(
(data) {
print('接收到数据: $data');
},
onError: (error) {
print('捕获到 Stream 错误: $error');
},
);
}
当 counter
等于 3 时,通过 controller.addError
向 Stream
中添加一个错误,onError
回调捕获并处理了这个错误。
总结异步操作与 Stream 在 Flutter 中的应用
在 Flutter 开发中,异步操作和 Stream
为开发者提供了强大的工具来处理耗时任务和实时数据更新。通过 Future
可以优雅地处理单个异步操作,而 Stream
则擅长处理一系列随时间变化的数据。无论是实时网络数据获取、传感器数据处理还是复杂的数据转换与合并,都可以通过这些机制实现。同时,合理的错误处理也是保证应用稳定性的关键。开发者需要根据具体的业务需求,灵活运用这些特性,打造出高效、流畅且稳定的 Flutter 应用。通过深入理解和熟练使用异步操作与 Stream
,可以显著提升 Flutter 应用的性能和用户体验。在实际项目中,结合 Flutter 的其他特性,如 StatefulWidget 和 StatelessWidget 的合理使用,以及与各种插件的配合,能够开发出功能丰富、响应迅速的移动应用。