MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

JavaScript异步可迭代对象的创建

2024-10-143.7k 阅读

异步可迭代对象的概念与基础

理解异步迭代

在JavaScript中,迭代是一种逐个访问集合中元素的过程。传统的迭代,如使用 for...of 循环遍历数组或字符串,是同步进行的。然而,在许多现代应用场景中,尤其是涉及到I/O操作(如网络请求、文件读取等)时,同步操作会阻塞主线程,导致应用程序卡顿。而异步迭代则允许我们在不阻塞主线程的情况下逐个处理数据。

异步可迭代对象是一种特殊的对象,它实现了异步迭代协议。这个协议定义了对象必须具有一个 Symbol.asyncIterator 方法,该方法返回一个异步迭代器。异步迭代器是一个对象,它有一个 next() 方法,该方法返回一个Promise对象。

异步迭代协议的构成

  1. Symbol.asyncIterator 方法
    • 每个异步可迭代对象必须包含一个键为 Symbol.asyncIterator 的方法。当这个方法被调用时,它应该返回一个异步迭代器对象。
    • 例如,假设有一个简单的异步可迭代对象 asyncIterableObject
const asyncIterableObject = {
    [Symbol.asyncIterator]() {
        // 这里返回一个异步迭代器对象
        return {
            // 实现next方法
            next() {
                return Promise.resolve({ value: 1, done: false });
            }
        };
    }
};
  1. 异步迭代器的 next() 方法
    • next() 方法返回一个Promise对象。这个Promise对象在解决(resolved)时,其 value 属性表示迭代的下一个值,done 属性是一个布尔值,false 表示还有更多的值可以迭代,true 表示迭代结束。
    • 继续上面的例子,next() 方法返回的Promise对象:
asyncIterableObject[Symbol.asyncIterator]().next().then(result => {
    console.log(result.value); // 输出 1
    console.log(result.done); // 输出 false
});

创建异步可迭代对象的基本方式

基于简单数据结构创建

  1. 异步迭代数组
    • 假设我们有一个需要异步处理的数组,例如,数组中的每个元素需要通过异步操作(如网络请求)来处理。
    • 首先,我们可以创建一个异步可迭代对象来迭代这个数组:
const asyncArray = [1, 2, 3];
const asyncIterableArray = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < asyncArray.length) {
                    const value = asyncArray[index];
                    // 模拟异步操作,这里使用setTimeout
                    await new Promise(resolve => setTimeout(resolve, 1000));
                    index++;
                    return { value, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
- 然后,我们可以使用 `for await...of` 循环来异步迭代这个对象:
(async () => {
    for await (const value of asyncIterableArray) {
        console.log(value);
    }
})();
// 这里会每隔1秒输出数组中的一个元素
  1. 异步迭代自定义集合
    • 假设有一个自定义的集合对象,比如一个简单的任务队列,每个任务需要异步执行。
const taskQueue = {
    tasks: [
        () => new Promise(resolve => setTimeout(() => { resolve('Task 1 done'); }, 1000)),
        () => new Promise(resolve => setTimeout(() => { resolve('Task 2 done'); }, 1500)),
        () => new Promise(resolve => setTimeout(() => { resolve('Task 3 done'); }, 2000))
    ],
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < this.tasks.length) {
                    const task = this.tasks[index];
                    const result = await task();
                    index++;
                    return { value: result, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
- 使用 `for await...of` 循环来执行这些任务:
(async () => {
    for await (const result of taskQueue) {
        console.log(result);
    }
})();
// 这里会按顺序,每隔相应的时间输出任务完成的结果

使用 asyncGenerator 创建

  1. 基本的 asyncGenerator 函数
    • asyncGenerator 函数是一种特殊的函数,它返回一个异步生成器对象,这个对象本身就是一个异步可迭代对象。
    • 例如,创建一个简单的 asyncGenerator 函数:
async function* asyncGeneratorFunction() {
    yield 1;
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield 2;
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield 3;
}
- 可以使用 `for await...of` 循环来迭代这个异步生成器:
(async () => {
    for await (const value of asyncGeneratorFunction()) {
        console.log(value);
    }
})();
// 这里会每隔1秒输出1, 2, 3
  1. 带参数的 asyncGenerator 函数
    • asyncGenerator 函数也可以接受参数,从而根据不同的参数生成不同的异步迭代序列。
    • 例如,创建一个根据参数生成一定范围内数字的异步生成器:
async function* asyncRangeGenerator(start, end) {
    for (let i = start; i <= end; i++) {
        await new Promise(resolve => setTimeout(resolve, 500));
        yield i;
    }
}
- 使用 `for await...of` 循环来迭代:
(async () => {
    for await (const value of asyncRangeGenerator(1, 5)) {
        console.log(value);
    }
})();
// 这里会每隔0.5秒输出1到5的数字

异步可迭代对象与异步操作的结合

fetch 结合进行多次网络请求

  1. 创建一个异步可迭代的URL列表
    • 假设我们有一个URL列表,需要逐个请求这些URL并处理响应。
const urls = [
    'https://example.com/api/data1',
    'https://example.com/api/data2',
    'https://example.com/api/data3'
];
const asyncUrlIterable = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < urls.length) {
                    const url = urls[index];
                    const response = await fetch(url);
                    const data = await response.json();
                    index++;
                    return { value: data, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
  1. 使用 for await...of 循环处理响应
(async () => {
    for await (const data of asyncUrlIterable) {
        console.log(data);
    }
})();
// 这里会逐个请求URL,并处理返回的JSON数据

与文件系统操作结合

  1. 在Node.js中异步读取多个文件
    • 在Node.js环境下,使用 fs/promises 模块进行文件操作。假设我们有多个文件,需要逐个读取它们的内容。
const fs = require('fs/promises');
const fileNames = ['file1.txt', 'file2.txt', 'file3.txt'];
const asyncFileIterable = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < fileNames.length) {
                    const fileName = fileNames[index];
                    const content = await fs.readFile(fileName, 'utf8');
                    index++;
                    return { value: content, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
  1. 使用 for await...of 循环处理文件内容
(async () => {
    for await (const content of asyncFileIterable) {
        console.log(content);
    }
})();
// 这里会逐个读取文件内容并输出

错误处理与异步可迭代对象

for await...of 循环中的错误处理

  1. 捕获单个异步操作的错误
    • 在异步可迭代对象的迭代过程中,如果某个异步操作抛出错误,for await...of 循环会停止迭代,并抛出该错误。我们可以使用 try...catch 块来捕获这个错误。
    • 例如,修改之前的异步生成器函数,使其在某个步骤抛出错误:
async function* asyncGeneratorWithError() {
    yield 1;
    await new Promise(resolve => setTimeout(resolve, 1000));
    throw new Error('Something went wrong');
    yield 2;
}
- 使用 `try...catch` 处理错误:
(async () => {
    try {
        for await (const value of asyncGeneratorWithError()) {
            console.log(value);
        }
    } catch (error) {
        console.error('Caught error:', error.message);
    }
})();
// 这里会输出1,然后捕获到错误并输出错误信息
  1. 处理多个异步操作中的错误
    • 当异步可迭代对象包含多个异步操作时,每个操作都可能抛出错误。我们可以在异步迭代器的 next() 方法中进行错误处理,以便更好地控制错误情况。
    • 例如,修改之前的异步URL迭代对象,在 fetch 操作中处理错误:
const urls = [
    'https://example.com/api/data1',
    'https://nonexistent-url.com',
    'https://example.com/api/data3'
];
const asyncUrlIterable = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < urls.length) {
                    const url = urls[index];
                    try {
                        const response = await fetch(url);
                        const data = await response.json();
                        index++;
                        return { value: data, done: false };
                    } catch (error) {
                        console.error(`Error fetching ${url}:`, error.message);
                        index++;
                        return { value: null, done: false };
                    }
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
- 使用 `for await...of` 循环处理响应:
(async () => {
    for await (const data of asyncUrlIterable) {
        if (data) {
            console.log(data);
        }
    }
})();
// 这里会请求URL,遇到错误的URL时输出错误信息,继续请求其他URL

异步生成器中的错误处理

  1. 在异步生成器函数内部处理错误
    • 异步生成器函数可以使用 try...catch 块来处理内部的错误。
    • 例如,修改异步生成器函数,在内部捕获错误:
async function* asyncGeneratorWithInternalErrorHandling() {
    try {
        yield 1;
        await new Promise((resolve, reject) => {
            setTimeout(() => {
                reject(new Error('Internal error'));
            }, 1000);
        });
        yield 2;
    } catch (error) {
        console.error('Caught internal error:', error.message);
        yield 'Error handled';
    }
}
- 使用 `for await...of` 循环迭代:
(async () => {
    for await (const value of asyncGeneratorWithInternalErrorHandling()) {
        console.log(value);
    }
})();
// 这里会输出1,捕获到内部错误并输出错误信息,然后输出'Error handled'
  1. 向异步生成器抛出错误
    • 外部代码可以通过异步生成器对象的 throw() 方法向异步生成器内部抛出错误。
    • 例如:
async function* asyncGeneratorForThrowingError() {
    try {
        yield 1;
        const result = yield 2;
        console.log('Result from outside:', result);
    } catch (error) {
        console.error('Caught thrown error:', error.message);
    }
}
- 使用 `throw()` 方法抛出错误:
(async () => {
    const asyncGen = asyncGeneratorForThrowingError();
    const result1 = await asyncGen.next();
    console.log(result1.value); // 输出 1
    const result2 = await asyncGen.next();
    console.log(result2.value); // 输出 2
    try {
        await asyncGen.throw(new Error('Thrown from outside'));
    } catch (error) {
        console.error('Caught outer error:', error.message);
    }
})();
// 这里会输出1和2,然后在异步生成器内部捕获到从外部抛出的错误并输出错误信息

性能优化与异步可迭代对象

控制并发数量

  1. 使用队列和 Promise.all 控制并发
    • 当异步可迭代对象中的异步操作是独立的,且我们希望控制并发数量以避免资源耗尽时,可以使用队列和 Promise.all 来实现。
    • 例如,假设有一个异步任务列表,每个任务是一个网络请求,我们希望同时最多进行3个请求:
const tasks = [
    () => fetch('https://example.com/api/task1'),
    () => fetch('https://example.com/api/task2'),
    () => fetch('https://example.com/api/task3'),
    () => fetch('https://example.com/api/task4'),
    () => fetch('https://example.com/api/task5')
];
const asyncTaskIterable = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < tasks.length) {
                    const task = tasks[index];
                    index++;
                    return { value: task(), done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
async function processTasksWithConcurrencyLimit(tasks, limit) {
    const results = [];
    const queue = [];
    for await (const task of asyncTaskIterable) {
        queue.push(task);
        if (queue.length === limit) {
            const completedTask = await Promise.race(queue);
            results.push(completedTask);
            queue.splice(queue.indexOf(completedTask), 1);
        }
    }
    const remainingTasks = await Promise.all(queue);
    results.push(...remainingTasks);
    return results;
}
processTasksWithConcurrencyLimit(tasks, 3).then(results => {
    console.log(results);
});
// 这里会同时最多进行3个网络请求,处理完所有任务
  1. 使用 async/awaitsetTimeout 模拟并发控制
    • 另一种方式是使用 async/awaitsetTimeout 来模拟并发控制。
    • 例如:
const tasks = [
    () => new Promise(resolve => setTimeout(() => { resolve('Task 1 done'); }, 1000)),
    () => new Promise(resolve => setTimeout(() => { resolve('Task 2 done'); }, 1500)),
    () => new Promise(resolve => setTimeout(() => { resolve('Task 3 done'); }, 2000)),
    () => new Promise(resolve => setTimeout(() => { resolve('Task 4 done'); }, 2500)),
    () => new Promise(resolve => setTimeout(() => { resolve('Task 5 done'); }, 3000))
];
async function processTasksSequentiallyWithInterval(tasks, interval) {
    const results = [];
    for (const task of tasks) {
        const result = await task();
        results.push(result);
        await new Promise(resolve => setTimeout(resolve, interval));
    }
    return results;
}
processTasksSequentiallyWithInterval(tasks, 1000).then(results => {
    console.log(results);
});
// 这里会按顺序执行任务,每个任务之间间隔1秒

缓存与复用

  1. 缓存异步可迭代对象的结果
    • 如果异步可迭代对象的迭代结果不会频繁变化,我们可以缓存这些结果,以避免重复的异步操作。
    • 例如,创建一个异步可迭代对象,用于读取配置文件内容,并缓存结果:
const fs = require('fs/promises');
let cachedConfig;
const asyncConfigIterable = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index === 0) {
                    if (!cachedConfig) {
                        cachedConfig = fs.readFile('config.json', 'utf8');
                    }
                    const config = await cachedConfig;
                    index++;
                    return { value: config, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
- 使用 `for await...of` 循环读取配置:
(async () => {
    for await (const config of asyncConfigIterable) {
        console.log(config);
    }
    // 再次迭代,不会重复读取文件
    for await (const config of asyncConfigIterable) {
        console.log(config);
    }
})();
  1. 复用异步迭代器
    • 在某些情况下,我们可能希望复用异步迭代器,而不是每次都创建一个新的。
    • 例如,创建一个可复用的异步迭代器对象:
const asyncIterable = {
    data: [1, 2, 3],
    asyncIterator: null,
    [Symbol.asyncIterator]() {
        if (!this.asyncIterator) {
            let index = 0;
            this.asyncIterator = {
                async next() {
                    if (index < this.data.length) {
                        const value = this.data[index];
                        index++;
                        return { value, done: false };
                    } else {
                        return { value: undefined, done: true };
                    }
                }
            };
        }
        return this.asyncIterator;
    }
};
- 多次使用同一个异步迭代器:
(async () => {
    const iter1 = asyncIterable[Symbol.asyncIterator]();
    const iter2 = asyncIterable[Symbol.asyncIterator]();
    const result1 = await iter1.next();
    const result2 = await iter2.next();
    console.log(result1.value === result2.value); // 输出 true
})();

高级应用场景

异步数据流处理

  1. 数据过滤与转换
    • 假设我们有一个异步可迭代对象,包含一些数字,我们需要过滤掉偶数,并将剩下的奇数乘以2。
async function* numberGenerator() {
    yield 1;
    yield 2;
    yield 3;
    yield 4;
    yield 5;
}
async function* filterAndTransformAsyncIterable(asyncIterable) {
    for await (const value of asyncIterable) {
        if (value % 2!== 0) {
            yield value * 2;
        }
    }
}
(async () => {
    const filteredAndTransformed = filterAndTransformAsyncIterable(numberGenerator());
    for await (const result of filteredAndTransformed) {
        console.log(result);
    }
})();
// 这里会输出2, 6, 10
  1. 数据聚合
    • 假设有一个异步可迭代对象,包含一些数字,我们需要计算这些数字的总和。
async function* numberGeneratorForAggregation() {
    yield 1;
    yield 2;
    yield 3;
}
async function aggregateAsyncIterable(asyncIterable) {
    let sum = 0;
    for await (const value of asyncIterable) {
        sum += value;
    }
    return sum;
}
(async () => {
    const sum = await aggregateAsyncIterable(numberGeneratorForAggregation());
    console.log(sum);
})();
// 这里会输出6

分布式系统中的异步迭代

  1. 在分布式计算中处理任务
    • 在分布式系统中,可能有多个节点需要处理一系列任务。我们可以创建一个异步可迭代对象,将任务分发给不同的节点进行处理。
    • 假设我们有一个简单的分布式计算场景,每个节点通过网络请求接收任务并返回结果:
const nodeUrls = [
    'https://node1.example.com/process',
    'https://node2.example.com/process',
    'https://node3.example.com/process'
];
const tasks = [1, 2, 3, 4, 5];
const asyncTaskIterable = {
    [Symbol.asyncIterator]() {
        let taskIndex = 0;
        let nodeIndex = 0;
        return {
            async next() {
                if (taskIndex < tasks.length) {
                    const task = tasks[taskIndex];
                    const nodeUrl = nodeUrls[nodeIndex];
                    const response = await fetch(nodeUrl, {
                        method: 'POST',
                        headers: {
                            'Content-Type': 'application/json'
                        },
                        body: JSON.stringify({ task })
                    });
                    const result = await response.json();
                    taskIndex++;
                    nodeIndex = (nodeIndex + 1) % nodeUrls.length;
                    return { value: result, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
- 使用 `for await...of` 循环处理任务结果:
(async () => {
    for await (const result of asyncTaskIterable) {
        console.log(result);
    }
})();
// 这里会将任务分发给不同节点处理,并输出结果
  1. 分布式数据同步
    • 在分布式系统中,数据同步是一个常见的需求。我们可以使用异步可迭代对象来同步不同节点的数据。
    • 假设每个节点有一个数据版本号和数据内容,我们需要将所有节点的数据同步到最新版本。
const nodeData = [
    { version: 1, data: 'Initial data' },
    { version: 2, data: 'Updated data' },
    { version: 3, data: 'Final data' }
];
const asyncNodeDataIterable = {
    [Symbol.asyncIterator]() {
        let index = 0;
        return {
            async next() {
                if (index < nodeData.length) {
                    const data = nodeData[index];
                    // 模拟网络同步操作,这里使用setTimeout
                    await new Promise(resolve => setTimeout(resolve, 1000));
                    index++;
                    return { value: data, done: false };
                } else {
                    return { value: undefined, done: true };
                }
            }
        };
    }
};
async function synchronizeData(asyncIterable) {
    let latestVersion = 0;
    let latestData;
    for await (const data of asyncIterable) {
        if (data.version > latestVersion) {
            latestVersion = data.version;
            latestData = data.data;
        }
    }
    return latestData;
}
(async () => {
    const synchronizedData = await synchronizeData(asyncNodeDataIterable);
    console.log(synchronizedData);
})();
// 这里会同步数据并输出最新的数据内容