MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

JavaScript数组迭代的并发实现

2021-07-255.4k 阅读

理解 JavaScript 数组迭代

在 JavaScript 中,数组迭代是一项非常常见的操作。传统的数组迭代方式,如 for 循环、forEach 方法等,都是顺序执行的。这意味着,在处理数组中的每一个元素时,前一个元素的操作完成后,才会开始处理下一个元素。

例如,使用 forEach 进行数组迭代:

const numbers = [1, 2, 3, 4, 5];
numbers.forEach((number) => {
    console.log(number);
});

上述代码会依次输出数组 numbers 中的每一个元素。在这个过程中,对 2 的处理必须等待对 1 的处理完成,对 3 的处理必须等待对 2 的处理完成,以此类推。

这种顺序迭代在很多场景下是适用的,但当数组元素的处理比较耗时,并且这些处理之间相互独立时,顺序迭代就会导致整体执行时间变长。比如,你需要同时向多个 API 发送请求,每个请求都基于数组中的一个元素,且这些请求之间不需要相互等待结果,这时并发执行就显得尤为重要。

并发的概念

并发(Concurrency)在计算机科学中,指的是在同一时间段内,多个任务可以交替执行。在 JavaScript 中,由于 JavaScript 是单线程的语言,它不能像多线程语言那样真正地同时执行多个任务。然而,JavaScript 通过事件循环(Event Loop)机制来模拟并发。

事件循环会不断地检查调用栈(Call Stack)是否为空。当调用栈为空时,它会从任务队列(Task Queue)中取出一个任务放入调用栈中执行。这样,即使某个任务执行时间较长,也不会阻塞其他任务的执行,从而实现了一种“假并发”的效果。

例如,考虑下面这段代码:

console.log('Start');
setTimeout(() => {
    console.log('Timeout');
}, 0);
console.log('End');

在这段代码中,setTimeout 会将其回调函数放入任务队列中。console.log('Start') 首先执行,然后 console.log('End') 执行,最后,当调用栈为空时,事件循环将 setTimeout 的回调函数从任务队列中取出并放入调用栈执行,输出 Timeout

传统的 JavaScript 并发实现方式

  1. Promise.all 与数组迭代
    • Promise.all 是 JavaScript 中用于处理多个 Promise 的方法。它接受一个 Promise 对象的数组作为参数,并返回一个新的 Promise。当所有输入的 Promise 都变为 resolved 状态时,这个新的 Promise 才会变为 resolved,并且其 resolved 的值是一个包含所有输入 Promise 结果的数组。
    • 假设我们有一个函数 fetchData,它返回一个 Promise,模拟从 API 获取数据:
function fetchData(id) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Data for ${id}`);
        }, Math.random() * 1000);
    });
}
const ids = [1, 2, 3];
Promise.all(ids.map((id) => fetchData(id)))
   .then((results) => {
        console.log(results);
    });
  • 在上述代码中,ids.map((id) => fetchData(id)) 创建了一个 Promise 数组,每个 Promise 对应一个 id 的数据获取操作。Promise.all 会并发执行这些 Promise,当所有 Promise 都完成后,将结果数组打印出来。
  1. Promise.race
    • Promise.race 同样接受一个 Promise 对象的数组作为参数,并返回一个新的 Promise。与 Promise.all 不同的是,Promise.race 会在数组中任何一个 Promise 变为 resolvedrejected 状态时,就将这个 Promise 的状态和值传递给返回的 Promise
    • 例如,我们可以用它来模拟多个请求中最快返回的那个:
function fetchData1() {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve('Data from fetchData1');
        }, 1500);
    });
}
function fetchData2() {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve('Data from fetchData2');
        }, 500);
    });
}
Promise.race([fetchData1(), fetchData2()])
   .then((result) => {
        console.log(result);
    });
  • 在这段代码中,fetchData2 会更快地返回结果,所以 Promise.race 返回的 Promise 会以 fetchData2 的结果 resolved,并打印出 Data from fetchData2

实现 JavaScript 数组迭代的并发

  1. 基本实现思路
    • 要实现数组迭代的并发,我们可以利用 Promise 和数组操作方法。一种常见的方法是将数组中的每个元素映射为一个 Promise,然后使用 Promise.all 来并发执行这些 Promise
    • 例如,假设我们有一个数组,数组中的每个元素都需要进行一个异步操作,比如读取文件或者发起网络请求。我们可以这样做:
function asyncOperation(item) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Processed ${item}`);
        }, Math.random() * 1000);
    });
}
const items = ['a', 'b', 'c'];
Promise.all(items.map((item) => asyncOperation(item)))
   .then((results) => {
        console.log(results);
    });
  • 在上述代码中,items.map((item) => asyncOperation(item)) 将数组 items 中的每个元素映射为一个 PromisePromise.all 会并发执行这些 Promise,并在所有 Promise 完成后将结果打印出来。
  1. 处理错误
    • 在实际应用中,异步操作可能会失败,我们需要处理这些错误。Promise.all 会在任何一个 Promiserejected 时,立即将返回的 Promiserejected
    • 我们修改一下前面的 asyncOperation 函数,使其有一定概率失败:
function asyncOperation(item) {
    return new Promise((resolve, reject) => {
        if (Math.random() > 0.5) {
            setTimeout(() => {
                resolve(`Processed ${item}`);
            }, Math.random() * 1000);
        } else {
            setTimeout(() => {
                reject(`Error processing ${item}`);
            }, Math.random() * 1000);
        }
    });
}
const items = ['a', 'b', 'c'];
Promise.all(items.map((item) => asyncOperation(item)))
   .then((results) => {
        console.log(results);
    })
   .catch((error) => {
        console.log(error);
    });
  • 在这段代码中,如果任何一个 asyncOperation 函数返回的 PromiserejectedPromise.all 返回的 Promise 也会被 rejected,并通过 .catch 块捕获错误。
  1. 控制并发数量
    • 在某些情况下,我们可能不希望同时并发执行所有的任务,因为这可能会耗尽系统资源,比如过多的网络请求可能会导致服务器过载。我们可以通过控制并发数量来解决这个问题。
    • 一种实现方式是使用队列和 Promise 来模拟有限并发:
function asyncOperation(item) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Processed ${item}`);
        }, Math.random() * 1000);
    });
}
function asyncMapLimit(arr, limit, callback) {
    return new Promise((resolve, reject) => {
        if (!arr.length) {
            return resolve([]);
        }
        const results = [];
        let completed = 0;
        const execute = () => {
            if (arr.length === 0 && completed === results.length) {
                return resolve(results);
            }
            const item = arr.shift();
            callback(item)
               .then((result) => {
                    results.push(result);
                    completed++;
                    execute();
                })
               .catch((error) => {
                    reject(error);
                });
        };
        for (let i = 0; i < limit && arr.length > 0; i++) {
            execute();
        }
    });
}
const items = ['a', 'b', 'c', 'd', 'e'];
asyncMapLimit(items, 2, asyncOperation)
   .then((results) => {
        console.log(results);
    });
  • 在上述代码中,asyncMapLimit 函数接受一个数组 arr、并发限制 limit 和一个异步回调函数 callback。它通过维护一个任务队列和已完成任务的计数,每次只执行不超过 limit 个任务,当一个任务完成后,再从队列中取出一个任务执行,直到所有任务完成。

高级并发控制

  1. 动态调整并发数量
    • 在一些复杂的场景中,我们可能需要根据系统的负载或者其他条件动态地调整并发数量。例如,我们可以根据网络状况来调整并发请求的数量。
    • 假设我们有一个函数 getNetworkStatus 来获取网络状态(这里只是模拟,实际中可以通过一些网络监测库实现),返回 'fast''slow'
function getNetworkStatus() {
    return Math.random() > 0.5? 'fast' :'slow';
}
function asyncOperation(item) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Processed ${item}`);
        }, Math.random() * 1000);
    });
}
function asyncMapDynamicLimit(arr, callback) {
    return new Promise((resolve, reject) => {
        if (!arr.length) {
            return resolve([]);
        }
        const results = [];
        let completed = 0;
        const networkStatus = getNetworkStatus();
        const limit = networkStatus === 'fast'? 5 : 2;
        const execute = () => {
            if (arr.length === 0 && completed === results.length) {
                return resolve(results);
            }
            const item = arr.shift();
            callback(item)
               .then((result) => {
                    results.push(result);
                    completed++;
                    execute();
                })
               .catch((error) => {
                    reject(error);
                });
        };
        for (let i = 0; i < limit && arr.length > 0; i++) {
            execute();
        }
    });
}
const items = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'];
asyncMapDynamicLimit(items, asyncOperation)
   .then((results) => {
        console.log(results);
    });
  • 在这段代码中,asyncMapDynamicLimit 函数根据网络状态动态地调整并发数量。如果网络状态为 'fast',并发数量设置为 5;如果为 'slow',并发数量设置为 2
  1. 并发与顺序结合
    • 有时候,我们可能需要一部分任务并发执行,而另一部分任务顺序执行。例如,在一个数据处理流程中,前半部分的数组元素处理可以并发执行,后半部分需要按顺序执行,以确保数据的一致性。
    • 我们可以这样实现:
function asyncOperation1(item) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Processed1 ${item}`);
        }, Math.random() * 1000);
    });
}
function asyncOperation2(item) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Processed2 ${item}`);
        }, Math.random() * 1000);
    });
}
function combinedOperation(arr) {
    return new Promise((resolve, reject) => {
        const mid = Math.floor(arr.length / 2);
        const firstHalf = arr.slice(0, mid);
        const secondHalf = arr.slice(mid);
        Promise.all(firstHalf.map((item) => asyncOperation1(item)))
           .then((firstResults) => {
                const sequentialPromises = secondHalf.map((item, index) => {
                    return new Promise((innerResolve, innerReject) => {
                        if (index === 0) {
                            asyncOperation2(item)
                               .then((result) => {
                                    innerResolve(result);
                                })
                               .catch((error) => {
                                    innerReject(error);
                                });
                        } else {
                            sequentialPromises[index - 1]
                               .then(() => {
                                    asyncOperation2(item)
                                       .then((result) => {
                                            innerResolve(result);
                                        })
                                       .catch((error) => {
                                            innerReject(error);
                                        });
                                })
                               .catch((error) => {
                                    innerReject(error);
                                });
                        }
                    });
                });
                Promise.all(sequentialPromises)
                   .then((secondResults) => {
                        resolve([...firstResults, ...secondResults]);
                    })
                   .catch((error) => {
                        reject(error);
                    });
            })
           .catch((error) => {
                reject(error);
            });
    });
}
const items = ['a', 'b', 'c', 'd', 'e'];
combinedOperation(items)
   .then((results) => {
        console.log(results);
    });
  • 在上述代码中,数组被分成两半,前半部分的元素通过 Promise.all 并发执行 asyncOperation1,后半部分的元素顺序执行 asyncOperation2,最后将两部分的结果合并返回。

性能分析与优化

  1. 并发数量对性能的影响
    • 并发数量的选择对性能有着重要的影响。如果并发数量设置得过低,任务执行时间会因为等待而变长;如果并发数量设置得过高,可能会导致资源耗尽,如内存不足或者网络拥塞。
    • 我们可以通过一些性能测试工具来分析不同并发数量下的性能表现。例如,使用 benchmark 库:
const Benchmark = require('benchmark');
function asyncOperation(item) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(`Processed ${item}`);
        }, Math.random() * 1000);
    });
}
function asyncMapLimit(arr, limit, callback) {
    return new Promise((resolve, reject) => {
        if (!arr.length) {
            return resolve([]);
        }
        const results = [];
        let completed = 0;
        const execute = () => {
            if (arr.length === 0 && completed === results.length) {
                return resolve(results);
            }
            const item = arr.shift();
            callback(item)
               .then((result) => {
                    results.push(result);
                    completed++;
                    execute();
                })
               .catch((error) => {
                    reject(error);
                });
        };
        for (let i = 0; i < limit && arr.length > 0; i++) {
            execute();
        }
    });
}
const suite = new Benchmark.Suite;
const items = Array.from({ length: 10 }, (_, i) => i + 1);
suite
   .add('Limit 2', () => {
        asyncMapLimit(items, 2, asyncOperation);
    })
   .add('Limit 5', () => {
        asyncMapLimit(items, 5, asyncOperation);
    })
   .add('Limit 10', () => {
        asyncMapLimit(items, 10, asyncOperation);
    })
   .on('cycle', function (event) {
        console.log(String(event.target));
    })
   .on('complete', function () {
        console.log('Fastest is'+ this.filter('fastest').map('name'));
    })
   .run({ 'async': true });
  • 上述代码使用 benchmark 库对不同并发限制(2510)下的 asyncMapLimit 函数进行性能测试。通过分析测试结果,可以找到在特定场景下最优的并发数量。
  1. 优化策略
    • 复用资源:在并发执行异步操作时,如果有一些资源可以复用,比如数据库连接或者网络套接字,可以减少资源创建和销毁的开销。例如,在进行多个数据库查询时,可以复用一个数据库连接池。
    • 缓存结果:如果某些异步操作的结果是相同的或者不会经常变化,可以将结果缓存起来。这样,当再次需要执行相同的异步操作时,可以直接从缓存中获取结果,而不需要重新执行。例如,在请求一些不经常更新的配置数据时,可以将其缓存。
    • 合理使用 async/await:虽然 async/await 使得异步代码看起来像同步代码,但过度使用可能会影响性能。在并发操作中,尽量使用 Promise 的链式调用和 Promise.all 等方法,以充分利用 JavaScript 的事件循环机制。

实际应用场景

  1. 数据爬取
    • 在数据爬取任务中,通常需要从多个网页获取数据。每个网页的爬取操作可以看作是一个独立的异步任务。通过并发执行这些任务,可以大大提高爬取效率。
    • 例如,使用 axios 库进行网页数据爬取:
const axios = require('axios');
function fetchPage(url) {
    return axios.get(url)
       .then((response) => {
            return response.data;
        });
}
const urls = ['https://example.com/page1', 'https://example.com/page2', 'https://example.com/page3'];
Promise.all(urls.map((url) => fetchPage(url)))
   .then((results) => {
        console.log(results);
    });
  • 在上述代码中,urls 数组包含多个网页的 URL,Promise.all 并发执行每个 fetchPage 操作,同时获取多个网页的数据。
  1. 文件处理
    • 当需要对多个文件进行处理时,比如读取文件内容、转换文件格式等,这些操作可以并发执行。假设我们使用 fs 模块和 util.promisify 来处理文件:
const { promisify } = require('util');
const fs = require('fs');
const readFile = promisify(fs.readFile);
function processFile(filePath) {
    return readFile(filePath, 'utf8')
       .then((content) => {
            // 对文件内容进行处理,这里只是简单返回内容长度
            return content.length;
        });
}
const filePaths = ['file1.txt', 'file2.txt', 'file3.txt'];
Promise.all(filePaths.map((filePath) => processFile(filePath)))
   .then((results) => {
        console.log(results);
    });
  • 在这段代码中,filePaths 数组包含多个文件路径,Promise.all 并发执行每个 processFile 操作,同时获取多个文件内容的长度。

通过以上对 JavaScript 数组迭代并发实现的详细讲解,包括基本概念、实现方式、高级控制、性能分析和实际应用场景,希望能帮助你在开发中更好地利用并发技术,提高程序的执行效率。