MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Node.js 并发控制与异步任务队列管理

2023-05-093.6k 阅读

1. 理解 Node.js 中的异步编程

在深入探讨并发控制与异步任务队列管理之前,我们需要先理解 Node.js 异步编程的基本概念。Node.js 以其单线程事件循环机制而闻名,这使得它能够高效地处理 I/O 密集型任务。

1.1 回调函数

回调函数是 Node.js 异步编程中最基础的方式。例如,读取文件的操作:

const fs = require('fs');

fs.readFile('example.txt', 'utf8', (err, data) => {
    if (err) {
        console.error(err);
        return;
    }
    console.log(data);
});

在上述代码中,fs.readFile 是一个异步操作,它接受一个回调函数作为参数。当文件读取操作完成后,Node.js 的事件循环会将回调函数放入事件队列,等待主线程执行。

1.2 Promise

Promise 是一种更优雅的处理异步操作的方式,它解决了回调地狱的问题。例如,使用 fs/promises 模块来读取文件:

const fs = require('fs/promises');

fs.readFile('example.txt', 'utf8')
   .then(data => {
        console.log(data);
    })
   .catch(err => {
        console.error(err);
    });

Promise 有三种状态:pending(进行中)、fulfilled(已成功)和 rejected(已失败)。当异步操作成功时,Promise 会进入 fulfilled 状态并调用 then 方法中的回调;当异步操作失败时,Promise 会进入 rejected 状态并调用 catch 方法中的回调。

1.3 Async/Await

Async/Await 是基于 Promise 的语法糖,使得异步代码看起来更像同步代码。

const fs = require('fs/promises');

async function readFileContent() {
    try {
        const data = await fs.readFile('example.txt', 'utf8');
        console.log(data);
    } catch (err) {
        console.error(err);
    }
}

readFileContent();

async 关键字用于定义一个异步函数,该函数总是返回一个 Promise。await 关键字只能在 async 函数内部使用,它会暂停当前 async 函数的执行,等待 Promise 被解决(resolved 或 rejected),然后恢复执行并返回 Promise 的值。

2. 并发控制的概念与需求

2.1 什么是并发控制

并发控制是指在程序执行过程中,对多个任务的执行顺序和执行资源进行管理,以确保系统的稳定性、性能和正确性。在 Node.js 中,由于单线程事件循环的特性,虽然不能真正意义上的并行执行多个任务(多核 CPU 并行),但可以通过并发控制来模拟并行效果,提高系统的整体效率。

2.2 为什么需要并发控制

  • 资源限制:例如网络带宽、文件系统 I/O 吞吐量等资源是有限的。如果同时发起过多的网络请求或文件读写操作,可能会导致资源耗尽,使程序崩溃或性能急剧下降。
  • 性能优化:合理地控制并发数量,可以避免过多任务竞争资源,从而提高整体的执行效率。例如,在爬取多个网页数据时,若并发数过高,可能会导致网络拥堵,反而降低了数据获取速度。

3. 基于 Promise 的并发控制

3.1 Promise.all

Promise.all 是一个用于处理多个 Promise 的方法,它接受一个 Promise 数组作为参数,并返回一个新的 Promise。只有当所有传入的 Promise 都成功解决(resolved)时,返回的 Promise 才会成功解决,其值为所有传入 Promise 解决值的数组。如果其中任何一个 Promise 被拒绝(rejected),返回的 Promise 就会立即被拒绝,其原因是第一个被拒绝的 Promise 的原因。

const fs = require('fs/promises');

const files = ['file1.txt', 'file2.txt', 'file3.txt'];
const promises = files.map(file => fs.readFile(file, 'utf8'));

Promise.all(promises)
   .then(dataArray => {
        dataArray.forEach((data, index) => {
            console.log(`Content of ${files[index]}:`, data);
        });
    })
   .catch(err => {
        console.error(err);
    });

在上述代码中,我们通过 map 方法创建了一个 Promise 数组,每个 Promise 负责读取一个文件。然后使用 Promise.all 等待所有文件读取操作完成。如果任何一个文件读取失败,整个操作就会失败。

3.2 Promise.race

Promise.race 同样接受一个 Promise 数组作为参数,并返回一个新的 Promise。只要数组中的任何一个 Promise 成功解决或被拒绝,返回的 Promise 就会立即以相同的状态和值被解决或被拒绝。

const fs = require('fs/promises');

const files = ['file1.txt', 'file2.txt', 'file3.txt'];
const promises = files.map(file => fs.readFile(file, 'utf8'));

Promise.race(promises)
   .then(data => {
        console.log('First resolved file content:', data);
    })
   .catch(err => {
        console.error(err);
    });

上述代码中,Promise.race 会等待第一个完成的文件读取操作,并返回其结果。这在一些需要快速获取某个结果的场景中非常有用,比如多个 API 调用,只需要第一个返回的结果。

3.3 控制并发数量

虽然 Promise.allPromise.race 很有用,但它们没有直接提供控制并发数量的功能。我们可以通过手动实现来控制并发数量。

function limitConcurrent(promises, limit) {
    return new Promise((resolve, reject) => {
        if (!Array.isArray(promises) ||!Number.isInteger(limit) || limit <= 0) {
            return reject(new Error('Invalid input'));
        }

        const results = [];
        let completedCount = 0;
        const runningPromises = [];

        function next() {
            if (promises.length === 0 && runningPromises.length === 0) {
                return resolve(results);
            }

            while (runningPromises.length < limit && promises.length > 0) {
                const promise = promises.shift();
                const index = results.length;
                runningPromises.push(promise
                   .then(value => {
                        results[index] = value;
                        completedCount++;
                        runningPromises.splice(runningPromises.indexOf(promise), 1);
                        next();
                    })
                   .catch(err => {
                        reject(err);
                    })
                );
            }
        }

        next();
    });
}

const fs = require('fs/promises');
const files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt', 'file5.txt'];
const promises = files.map(file => fs.readFile(file, 'utf8'));

limitConcurrent(promises, 2)
   .then(dataArray => {
        dataArray.forEach((data, index) => {
            console.log(`Content of ${files[index]}:`, data);
        });
    })
   .catch(err => {
        console.error(err);
    });

在上述代码中,limitConcurrent 函数接受一个 Promise 数组和并发限制数量 limit。它通过维护一个正在运行的 Promise 数组 runningPromises 来控制并发数量,当有 Promise 完成时,会从 runningPromises 中移除并启动下一个 Promise。

4. 异步任务队列管理

4.1 任务队列的基本概念

任务队列是一种数据结构,用于存储待执行的任务。在 Node.js 中,任务队列通常用于管理异步任务,确保任务按照一定的顺序或策略执行。任务队列可以是先进先出(FIFO)的,也可以根据任务的优先级等因素进行排序。

4.2 简单的任务队列实现

class TaskQueue {
    constructor() {
        this.tasks = [];
        this.isRunning = false;
    }

    addTask(task) {
        this.tasks.push(task);
        this.run();
    }

    run() {
        if (this.isRunning || this.tasks.length === 0) {
            return;
        }

        this.isRunning = true;
        const task = this.tasks.shift();
        task()
           .then(() => {
                this.isRunning = false;
                this.run();
            })
           .catch(err => {
                console.error(err);
                this.isRunning = false;
                this.run();
            });
    }
}

const queue = new TaskQueue();

function asyncTask() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task completed');
            resolve();
        }, 1000);
    });
}

queue.addTask(asyncTask);
queue.addTask(asyncTask);

在上述代码中,TaskQueue 类实现了一个简单的任务队列。addTask 方法用于将任务添加到队列中,run 方法用于执行队列中的任务。任务按照添加的顺序依次执行。

4.3 带并发控制的任务队列

class LimitedTaskQueue {
    constructor(concurrency) {
        this.tasks = [];
        this.runningCount = 0;
        this.concurrency = concurrency;
    }

    addTask(task) {
        this.tasks.push(task);
        this.run();
    }

    run() {
        while (this.runningCount < this.concurrency && this.tasks.length > 0) {
            const task = this.tasks.shift();
            this.runningCount++;
            task()
               .then(() => {
                    this.runningCount--;
                    this.run();
                })
               .catch(err => {
                    console.error(err);
                    this.runningCount--;
                    this.run();
                });
        }
    }
}

const queue = new LimitedTaskQueue(2);

function asyncTask() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task completed');
            resolve();
        }, 1000);
    });
}

queue.addTask(asyncTask);
queue.addTask(asyncTask);
queue.addTask(asyncTask);

LimitedTaskQueue 类扩展了 TaskQueue,增加了并发控制功能。通过 concurrency 属性控制同时执行的任务数量,runningCount 记录当前正在执行的任务数量。

5. 使用第三方库进行并发控制与任务队列管理

5.1 async 库

async 库是一个非常流行的用于处理异步操作的库,它提供了丰富的方法来管理并发和任务队列。

const async = require('async');
const fs = require('fs');

const files = ['file1.txt', 'file2.txt', 'file3.txt'];

async.mapLimit(files, 2, (file, callback) => {
    fs.readFile(file, 'utf8', (err, data) => {
        if (err) {
            callback(err);
        } else {
            callback(null, data);
        }
    });
}, (err, results) => {
    if (err) {
        console.error(err);
    } else {
        results.forEach((data, index) => {
            console.log(`Content of ${files[index]}:`, data);
        });
    }
});

async.mapLimit 方法与 Promise.all 类似,但可以控制并发数量。第一个参数是要处理的数组,第二个参数是并发限制数量,第三个参数是处理每个元素的异步函数,最后一个参数是所有任务完成后的回调。

5.2 p-limit

p-limit 是一个专注于并发控制的库,它基于 Promise 实现。

const Limit = require('p-limit');
const fs = require('fs/promises');

const limit = new Limit(2);
const files = ['file1.txt', 'file2.txt', 'file3.txt'];

const promises = files.map(file => limit(() => fs.readFile(file, 'utf8')));

Promise.all(promises)
   .then(dataArray => {
        dataArray.forEach((data, index) => {
            console.log(`Content of ${files[index]}:`, data);
        });
    })
   .catch(err => {
        console.error(err);
    });

p-limit 创建一个限制并发的函数 limit,通过将异步任务包装在 limit 函数中,可以控制并发数量。

6. 并发控制与任务队列管理的实际应用场景

6.1 网络爬虫

在网络爬虫中,需要从多个网页获取数据。如果同时发起过多的请求,可能会导致目标服务器拒绝服务或网络拥堵。通过并发控制和任务队列管理,可以合理地控制请求数量,确保爬虫稳定高效运行。

const axios = require('axios');
const async = require('async');

const urls = ['https://example.com', 'https://another-example.com', 'https://third-example.com'];

async.mapLimit(urls, 3, (url, callback) => {
    axios.get(url)
       .then(response => {
            callback(null, response.data);
        })
       .catch(err => {
            callback(err);
        });
}, (err, results) => {
    if (err) {
        console.error(err);
    } else {
        results.forEach((data, index) => {
            console.log(`Data from ${urls[index]}:`, data);
        });
    }
});

上述代码使用 async 库的 mapLimit 方法控制并发请求数量,从多个 URL 获取数据。

6.2 文件处理

在处理大量文件时,如读取、写入或转换文件格式,合理的并发控制可以提高处理效率。例如,批量压缩图片文件:

const sharp = require('sharp');
const async = require('async');
const fs = require('fs');

const inputFiles = ['image1.jpg', 'image2.jpg', 'image3.jpg'];
const outputDir = 'compressed_images/';

async.mapLimit(inputFiles, 2, (inputFile, callback) => {
    const outputFile = outputDir + inputFile;
    sharp(inputFile)
       .jpeg({ quality: 80 })
       .toFile(outputFile)
       .then(() => {
            callback(null);
        })
       .catch(err => {
            callback(err);
        });
}, (err) => {
    if (err) {
        console.error(err);
    } else {
        console.log('All images compressed successfully');
    }
});

此代码使用 async 库控制并发处理图片压缩任务,避免同时处理过多文件导致系统资源耗尽。

7. 并发控制与任务队列管理的性能优化

7.1 调整并发数量

并发数量并非越高越好,需要根据系统资源和任务特性进行调整。例如,在网络请求任务中,过高的并发可能导致网络拥堵,而在 CPU 密集型任务中,过高的并发可能导致 CPU 资源竞争。可以通过性能测试工具(如 benchmark 库)来确定最优的并发数量。

const Benchmark = require('benchmark');
const async = require('async');
const axios = require('axios');

const suite = new Benchmark.Suite;
const urls = Array.from({ length: 10 }, (_, i) => `https://example.com/api/${i}`);

const concurrencyLevels = [1, 2, 4, 8, 16];

concurrencyLevels.forEach(concurrency => {
    suite.add(`Concurrency ${concurrency}`, function () {
        async.mapLimit(urls, concurrency, (url, callback) => {
            axios.get(url)
               .then(() => {
                    callback(null);
                })
               .catch(err => {
                    callback(err);
                });
        }, () => {});
    });
});

suite
   .on('cycle', function (event) {
        console.log(String(event.target));
    })
   .on('complete', function () {
        console.log('Fastest is'+ this.filter('fastest').map('name'));
    })
   .run({ 'async': true });

上述代码使用 benchmark 库对不同并发数量下的网络请求性能进行测试,以找到最优的并发数量。

7.2 任务优先级管理

在任务队列中,可以根据任务的优先级进行排序。例如,在一个文件处理系统中,紧急文件的处理优先级高于普通文件。

class PriorityTaskQueue {
    constructor() {
        this.tasks = [];
        this.isRunning = false;
    }

    addTask(task, priority) {
        this.tasks.push({ task, priority });
        this.tasks.sort((a, b) => b.priority - a.priority);
        this.run();
    }

    run() {
        if (this.isRunning || this.tasks.length === 0) {
            return;
        }

        this.isRunning = true;
        const { task } = this.tasks.shift();
        task()
           .then(() => {
                this.isRunning = false;
                this.run();
            })
           .catch(err => {
                console.error(err);
                this.isRunning = false;
                this.run();
            });
    }
}

const queue = new PriorityTaskQueue();

function highPriorityTask() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('High priority task completed');
            resolve();
        }, 1000);
    });
}

function lowPriorityTask() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Low priority task completed');
            resolve();
        }, 2000);
    });
}

queue.addTask(highPriorityTask, 2);
queue.addTask(lowPriorityTask, 1);

在上述代码中,PriorityTaskQueue 类根据任务的优先级对任务进行排序,优先执行高优先级任务。

8. 错误处理与健壮性

8.1 并发任务中的错误处理

在并发控制和任务队列管理中,错误处理至关重要。当一个任务失败时,需要根据具体情况决定是否继续执行其他任务。例如,在 Promise.all 中,一个 Promise 失败会导致整个操作失败。但在某些场景下,我们可能希望忽略个别任务的失败,继续执行其他任务。

const fs = require('fs/promises');

const files = ['file1.txt', 'non - existent.txt', 'file3.txt'];
const promises = files.map(file => fs.readFile(file, 'utf8').catch(err => null));

Promise.all(promises)
   .then(dataArray => {
        dataArray.forEach((data, index) => {
            if (data === null) {
                console.log(`Failed to read ${files[index]}`);
            } else {
                console.log(`Content of ${files[index]}:`, data);
            }
        });
    });

在上述代码中,通过在每个 Promise 上添加 catch 方法,捕获单个文件读取失败的错误,并继续执行其他文件的读取操作。

8.2 任务队列中的错误处理

在任务队列中,同样需要处理任务执行过程中的错误。当一个任务失败时,需要确保队列的状态不受影响,并且能够继续执行后续任务。

class TaskQueueWithErrorHandling {
    constructor() {
        this.tasks = [];
        this.isRunning = false;
    }

    addTask(task) {
        this.tasks.push(task);
        this.run();
    }

    run() {
        if (this.isRunning || this.tasks.length === 0) {
            return;
        }

        this.isRunning = true;
        const task = this.tasks.shift();
        task()
           .then(() => {
                this.isRunning = false;
                this.run();
            })
           .catch(err => {
                console.error(err);
                this.isRunning = false;
                this.run();
            });
    }
}

const queue = new TaskQueueWithErrorHandling();

function failingTask() {
    return new Promise((_, reject) => {
        setTimeout(() => {
            reject(new Error('Task failed'));
        }, 1000);
    });
}

function succeedingTask() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task succeeded');
            resolve();
        }, 1000);
    });
}

queue.addTask(failingTask);
queue.addTask(succeedingTask);

在上述代码中,TaskQueueWithErrorHandling 类在任务执行失败时,捕获错误并继续执行队列中的下一个任务,确保队列的正常运行。

通过深入理解并发控制与异步任务队列管理,我们可以更好地利用 Node.js 的单线程事件循环机制,开发出高效、稳定的应用程序。无论是在网络爬虫、文件处理还是其他 I/O 密集型场景中,合理的并发控制和任务队列管理都能显著提升系统性能。同时,要注意错误处理和性能优化,以确保应用程序的健壮性和高效性。