JavaScript异步可迭代对象的创建
2024-10-143.7k 阅读
异步可迭代对象的概念与基础
理解异步迭代
在JavaScript中,迭代是一种逐个访问集合中元素的过程。传统的迭代,如使用 for...of
循环遍历数组或字符串,是同步进行的。然而,在许多现代应用场景中,尤其是涉及到I/O操作(如网络请求、文件读取等)时,同步操作会阻塞主线程,导致应用程序卡顿。而异步迭代则允许我们在不阻塞主线程的情况下逐个处理数据。
异步可迭代对象是一种特殊的对象,它实现了异步迭代协议。这个协议定义了对象必须具有一个 Symbol.asyncIterator
方法,该方法返回一个异步迭代器。异步迭代器是一个对象,它有一个 next()
方法,该方法返回一个Promise对象。
异步迭代协议的构成
Symbol.asyncIterator
方法:- 每个异步可迭代对象必须包含一个键为
Symbol.asyncIterator
的方法。当这个方法被调用时,它应该返回一个异步迭代器对象。 - 例如,假设有一个简单的异步可迭代对象
asyncIterableObject
:
- 每个异步可迭代对象必须包含一个键为
const asyncIterableObject = {
[Symbol.asyncIterator]() {
// 这里返回一个异步迭代器对象
return {
// 实现next方法
next() {
return Promise.resolve({ value: 1, done: false });
}
};
}
};
- 异步迭代器的
next()
方法:next()
方法返回一个Promise对象。这个Promise对象在解决(resolved)时,其value
属性表示迭代的下一个值,done
属性是一个布尔值,false
表示还有更多的值可以迭代,true
表示迭代结束。- 继续上面的例子,
next()
方法返回的Promise对象:
asyncIterableObject[Symbol.asyncIterator]().next().then(result => {
console.log(result.value); // 输出 1
console.log(result.done); // 输出 false
});
创建异步可迭代对象的基本方式
基于简单数据结构创建
- 异步迭代数组:
- 假设我们有一个需要异步处理的数组,例如,数组中的每个元素需要通过异步操作(如网络请求)来处理。
- 首先,我们可以创建一个异步可迭代对象来迭代这个数组:
const asyncArray = [1, 2, 3];
const asyncIterableArray = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < asyncArray.length) {
const value = asyncArray[index];
// 模拟异步操作,这里使用setTimeout
await new Promise(resolve => setTimeout(resolve, 1000));
index++;
return { value, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 然后,我们可以使用 `for await...of` 循环来异步迭代这个对象:
(async () => {
for await (const value of asyncIterableArray) {
console.log(value);
}
})();
// 这里会每隔1秒输出数组中的一个元素
- 异步迭代自定义集合:
- 假设有一个自定义的集合对象,比如一个简单的任务队列,每个任务需要异步执行。
const taskQueue = {
tasks: [
() => new Promise(resolve => setTimeout(() => { resolve('Task 1 done'); }, 1000)),
() => new Promise(resolve => setTimeout(() => { resolve('Task 2 done'); }, 1500)),
() => new Promise(resolve => setTimeout(() => { resolve('Task 3 done'); }, 2000))
],
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < this.tasks.length) {
const task = this.tasks[index];
const result = await task();
index++;
return { value: result, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 使用 `for await...of` 循环来执行这些任务:
(async () => {
for await (const result of taskQueue) {
console.log(result);
}
})();
// 这里会按顺序,每隔相应的时间输出任务完成的结果
使用 asyncGenerator
创建
- 基本的
asyncGenerator
函数:asyncGenerator
函数是一种特殊的函数,它返回一个异步生成器对象,这个对象本身就是一个异步可迭代对象。- 例如,创建一个简单的
asyncGenerator
函数:
async function* asyncGeneratorFunction() {
yield 1;
await new Promise(resolve => setTimeout(resolve, 1000));
yield 2;
await new Promise(resolve => setTimeout(resolve, 1000));
yield 3;
}
- 可以使用 `for await...of` 循环来迭代这个异步生成器:
(async () => {
for await (const value of asyncGeneratorFunction()) {
console.log(value);
}
})();
// 这里会每隔1秒输出1, 2, 3
- 带参数的
asyncGenerator
函数:asyncGenerator
函数也可以接受参数,从而根据不同的参数生成不同的异步迭代序列。- 例如,创建一个根据参数生成一定范围内数字的异步生成器:
async function* asyncRangeGenerator(start, end) {
for (let i = start; i <= end; i++) {
await new Promise(resolve => setTimeout(resolve, 500));
yield i;
}
}
- 使用 `for await...of` 循环来迭代:
(async () => {
for await (const value of asyncRangeGenerator(1, 5)) {
console.log(value);
}
})();
// 这里会每隔0.5秒输出1到5的数字
异步可迭代对象与异步操作的结合
与 fetch
结合进行多次网络请求
- 创建一个异步可迭代的URL列表:
- 假设我们有一个URL列表,需要逐个请求这些URL并处理响应。
const urls = [
'https://example.com/api/data1',
'https://example.com/api/data2',
'https://example.com/api/data3'
];
const asyncUrlIterable = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < urls.length) {
const url = urls[index];
const response = await fetch(url);
const data = await response.json();
index++;
return { value: data, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 使用
for await...of
循环处理响应:
(async () => {
for await (const data of asyncUrlIterable) {
console.log(data);
}
})();
// 这里会逐个请求URL,并处理返回的JSON数据
与文件系统操作结合
- 在Node.js中异步读取多个文件:
- 在Node.js环境下,使用
fs/promises
模块进行文件操作。假设我们有多个文件,需要逐个读取它们的内容。
- 在Node.js环境下,使用
const fs = require('fs/promises');
const fileNames = ['file1.txt', 'file2.txt', 'file3.txt'];
const asyncFileIterable = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < fileNames.length) {
const fileName = fileNames[index];
const content = await fs.readFile(fileName, 'utf8');
index++;
return { value: content, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 使用
for await...of
循环处理文件内容:
(async () => {
for await (const content of asyncFileIterable) {
console.log(content);
}
})();
// 这里会逐个读取文件内容并输出
错误处理与异步可迭代对象
for await...of
循环中的错误处理
- 捕获单个异步操作的错误:
- 在异步可迭代对象的迭代过程中,如果某个异步操作抛出错误,
for await...of
循环会停止迭代,并抛出该错误。我们可以使用try...catch
块来捕获这个错误。 - 例如,修改之前的异步生成器函数,使其在某个步骤抛出错误:
- 在异步可迭代对象的迭代过程中,如果某个异步操作抛出错误,
async function* asyncGeneratorWithError() {
yield 1;
await new Promise(resolve => setTimeout(resolve, 1000));
throw new Error('Something went wrong');
yield 2;
}
- 使用 `try...catch` 处理错误:
(async () => {
try {
for await (const value of asyncGeneratorWithError()) {
console.log(value);
}
} catch (error) {
console.error('Caught error:', error.message);
}
})();
// 这里会输出1,然后捕获到错误并输出错误信息
- 处理多个异步操作中的错误:
- 当异步可迭代对象包含多个异步操作时,每个操作都可能抛出错误。我们可以在异步迭代器的
next()
方法中进行错误处理,以便更好地控制错误情况。 - 例如,修改之前的异步URL迭代对象,在
fetch
操作中处理错误:
- 当异步可迭代对象包含多个异步操作时,每个操作都可能抛出错误。我们可以在异步迭代器的
const urls = [
'https://example.com/api/data1',
'https://nonexistent-url.com',
'https://example.com/api/data3'
];
const asyncUrlIterable = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < urls.length) {
const url = urls[index];
try {
const response = await fetch(url);
const data = await response.json();
index++;
return { value: data, done: false };
} catch (error) {
console.error(`Error fetching ${url}:`, error.message);
index++;
return { value: null, done: false };
}
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 使用 `for await...of` 循环处理响应:
(async () => {
for await (const data of asyncUrlIterable) {
if (data) {
console.log(data);
}
}
})();
// 这里会请求URL,遇到错误的URL时输出错误信息,继续请求其他URL
异步生成器中的错误处理
- 在异步生成器函数内部处理错误:
- 异步生成器函数可以使用
try...catch
块来处理内部的错误。 - 例如,修改异步生成器函数,在内部捕获错误:
- 异步生成器函数可以使用
async function* asyncGeneratorWithInternalErrorHandling() {
try {
yield 1;
await new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error('Internal error'));
}, 1000);
});
yield 2;
} catch (error) {
console.error('Caught internal error:', error.message);
yield 'Error handled';
}
}
- 使用 `for await...of` 循环迭代:
(async () => {
for await (const value of asyncGeneratorWithInternalErrorHandling()) {
console.log(value);
}
})();
// 这里会输出1,捕获到内部错误并输出错误信息,然后输出'Error handled'
- 向异步生成器抛出错误:
- 外部代码可以通过异步生成器对象的
throw()
方法向异步生成器内部抛出错误。 - 例如:
- 外部代码可以通过异步生成器对象的
async function* asyncGeneratorForThrowingError() {
try {
yield 1;
const result = yield 2;
console.log('Result from outside:', result);
} catch (error) {
console.error('Caught thrown error:', error.message);
}
}
- 使用 `throw()` 方法抛出错误:
(async () => {
const asyncGen = asyncGeneratorForThrowingError();
const result1 = await asyncGen.next();
console.log(result1.value); // 输出 1
const result2 = await asyncGen.next();
console.log(result2.value); // 输出 2
try {
await asyncGen.throw(new Error('Thrown from outside'));
} catch (error) {
console.error('Caught outer error:', error.message);
}
})();
// 这里会输出1和2,然后在异步生成器内部捕获到从外部抛出的错误并输出错误信息
性能优化与异步可迭代对象
控制并发数量
- 使用队列和
Promise.all
控制并发:- 当异步可迭代对象中的异步操作是独立的,且我们希望控制并发数量以避免资源耗尽时,可以使用队列和
Promise.all
来实现。 - 例如,假设有一个异步任务列表,每个任务是一个网络请求,我们希望同时最多进行3个请求:
- 当异步可迭代对象中的异步操作是独立的,且我们希望控制并发数量以避免资源耗尽时,可以使用队列和
const tasks = [
() => fetch('https://example.com/api/task1'),
() => fetch('https://example.com/api/task2'),
() => fetch('https://example.com/api/task3'),
() => fetch('https://example.com/api/task4'),
() => fetch('https://example.com/api/task5')
];
const asyncTaskIterable = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < tasks.length) {
const task = tasks[index];
index++;
return { value: task(), done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
async function processTasksWithConcurrencyLimit(tasks, limit) {
const results = [];
const queue = [];
for await (const task of asyncTaskIterable) {
queue.push(task);
if (queue.length === limit) {
const completedTask = await Promise.race(queue);
results.push(completedTask);
queue.splice(queue.indexOf(completedTask), 1);
}
}
const remainingTasks = await Promise.all(queue);
results.push(...remainingTasks);
return results;
}
processTasksWithConcurrencyLimit(tasks, 3).then(results => {
console.log(results);
});
// 这里会同时最多进行3个网络请求,处理完所有任务
- 使用
async/await
和setTimeout
模拟并发控制:- 另一种方式是使用
async/await
和setTimeout
来模拟并发控制。 - 例如:
- 另一种方式是使用
const tasks = [
() => new Promise(resolve => setTimeout(() => { resolve('Task 1 done'); }, 1000)),
() => new Promise(resolve => setTimeout(() => { resolve('Task 2 done'); }, 1500)),
() => new Promise(resolve => setTimeout(() => { resolve('Task 3 done'); }, 2000)),
() => new Promise(resolve => setTimeout(() => { resolve('Task 4 done'); }, 2500)),
() => new Promise(resolve => setTimeout(() => { resolve('Task 5 done'); }, 3000))
];
async function processTasksSequentiallyWithInterval(tasks, interval) {
const results = [];
for (const task of tasks) {
const result = await task();
results.push(result);
await new Promise(resolve => setTimeout(resolve, interval));
}
return results;
}
processTasksSequentiallyWithInterval(tasks, 1000).then(results => {
console.log(results);
});
// 这里会按顺序执行任务,每个任务之间间隔1秒
缓存与复用
- 缓存异步可迭代对象的结果:
- 如果异步可迭代对象的迭代结果不会频繁变化,我们可以缓存这些结果,以避免重复的异步操作。
- 例如,创建一个异步可迭代对象,用于读取配置文件内容,并缓存结果:
const fs = require('fs/promises');
let cachedConfig;
const asyncConfigIterable = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index === 0) {
if (!cachedConfig) {
cachedConfig = fs.readFile('config.json', 'utf8');
}
const config = await cachedConfig;
index++;
return { value: config, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 使用 `for await...of` 循环读取配置:
(async () => {
for await (const config of asyncConfigIterable) {
console.log(config);
}
// 再次迭代,不会重复读取文件
for await (const config of asyncConfigIterable) {
console.log(config);
}
})();
- 复用异步迭代器:
- 在某些情况下,我们可能希望复用异步迭代器,而不是每次都创建一个新的。
- 例如,创建一个可复用的异步迭代器对象:
const asyncIterable = {
data: [1, 2, 3],
asyncIterator: null,
[Symbol.asyncIterator]() {
if (!this.asyncIterator) {
let index = 0;
this.asyncIterator = {
async next() {
if (index < this.data.length) {
const value = this.data[index];
index++;
return { value, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
return this.asyncIterator;
}
};
- 多次使用同一个异步迭代器:
(async () => {
const iter1 = asyncIterable[Symbol.asyncIterator]();
const iter2 = asyncIterable[Symbol.asyncIterator]();
const result1 = await iter1.next();
const result2 = await iter2.next();
console.log(result1.value === result2.value); // 输出 true
})();
高级应用场景
异步数据流处理
- 数据过滤与转换:
- 假设我们有一个异步可迭代对象,包含一些数字,我们需要过滤掉偶数,并将剩下的奇数乘以2。
async function* numberGenerator() {
yield 1;
yield 2;
yield 3;
yield 4;
yield 5;
}
async function* filterAndTransformAsyncIterable(asyncIterable) {
for await (const value of asyncIterable) {
if (value % 2!== 0) {
yield value * 2;
}
}
}
(async () => {
const filteredAndTransformed = filterAndTransformAsyncIterable(numberGenerator());
for await (const result of filteredAndTransformed) {
console.log(result);
}
})();
// 这里会输出2, 6, 10
- 数据聚合:
- 假设有一个异步可迭代对象,包含一些数字,我们需要计算这些数字的总和。
async function* numberGeneratorForAggregation() {
yield 1;
yield 2;
yield 3;
}
async function aggregateAsyncIterable(asyncIterable) {
let sum = 0;
for await (const value of asyncIterable) {
sum += value;
}
return sum;
}
(async () => {
const sum = await aggregateAsyncIterable(numberGeneratorForAggregation());
console.log(sum);
})();
// 这里会输出6
分布式系统中的异步迭代
- 在分布式计算中处理任务:
- 在分布式系统中,可能有多个节点需要处理一系列任务。我们可以创建一个异步可迭代对象,将任务分发给不同的节点进行处理。
- 假设我们有一个简单的分布式计算场景,每个节点通过网络请求接收任务并返回结果:
const nodeUrls = [
'https://node1.example.com/process',
'https://node2.example.com/process',
'https://node3.example.com/process'
];
const tasks = [1, 2, 3, 4, 5];
const asyncTaskIterable = {
[Symbol.asyncIterator]() {
let taskIndex = 0;
let nodeIndex = 0;
return {
async next() {
if (taskIndex < tasks.length) {
const task = tasks[taskIndex];
const nodeUrl = nodeUrls[nodeIndex];
const response = await fetch(nodeUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ task })
});
const result = await response.json();
taskIndex++;
nodeIndex = (nodeIndex + 1) % nodeUrls.length;
return { value: result, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
- 使用 `for await...of` 循环处理任务结果:
(async () => {
for await (const result of asyncTaskIterable) {
console.log(result);
}
})();
// 这里会将任务分发给不同节点处理,并输出结果
- 分布式数据同步:
- 在分布式系统中,数据同步是一个常见的需求。我们可以使用异步可迭代对象来同步不同节点的数据。
- 假设每个节点有一个数据版本号和数据内容,我们需要将所有节点的数据同步到最新版本。
const nodeData = [
{ version: 1, data: 'Initial data' },
{ version: 2, data: 'Updated data' },
{ version: 3, data: 'Final data' }
];
const asyncNodeDataIterable = {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next() {
if (index < nodeData.length) {
const data = nodeData[index];
// 模拟网络同步操作,这里使用setTimeout
await new Promise(resolve => setTimeout(resolve, 1000));
index++;
return { value: data, done: false };
} else {
return { value: undefined, done: true };
}
}
};
}
};
async function synchronizeData(asyncIterable) {
let latestVersion = 0;
let latestData;
for await (const data of asyncIterable) {
if (data.version > latestVersion) {
latestVersion = data.version;
latestData = data.data;
}
}
return latestData;
}
(async () => {
const synchronizedData = await synchronizeData(asyncNodeDataIterable);
console.log(synchronizedData);
})();
// 这里会同步数据并输出最新的数据内容