MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

JavaScript在Node编程中的并发处理

2022-03-034.1k 阅读

JavaScript 中的并发概念

在深入探讨 Node 编程中的并发处理之前,我们先来明确一下并发的概念。并发是指在同一时间段内,多个任务看似同时执行。这并不意味着它们在真正意义上是同时执行的(在单处理器系统中,同一时刻实际上只能执行一个任务),而是通过快速地切换任务执行,给人一种同时执行的错觉。

在 JavaScript 中,由于其单线程的特性,并发处理尤为重要。单线程意味着同一时间只能执行一个代码块。如果一个任务长时间运行(例如一个复杂的计算或者一个缓慢的 I/O 操作),它会阻塞线程,导致其他任务无法执行,用户界面(在浏览器环境下)可能会变得无响应。为了解决这个问题,JavaScript 引入了异步编程的概念,而这正是实现并发处理的关键。

事件循环机制

JavaScript 实现并发的核心机制是事件循环(Event Loop)。事件循环是一个持续运行的循环,它不断检查调用栈(Call Stack)是否为空。当调用栈为空时,它会从任务队列(Task Queue)中取出一个任务放入调用栈中执行。

调用栈是一个存储函数调用的栈结构。当一个函数被调用时,它的执行上下文会被压入调用栈;当函数执行完毕,其执行上下文会从调用栈中弹出。例如:

function add(a, b) {
    return a + b;
}

function multiply(a, b) {
    let sum = add(a, b);
    return sum * 2;
}

multiply(2, 3);

在这个例子中,当 multiply 函数被调用时,它的执行上下文被压入调用栈。在 multiply 函数内部调用 add 函数时,add 函数的执行上下文也被压入调用栈。当 add 函数执行完毕返回结果后,其执行上下文从调用栈中弹出。multiply 函数继续执行并返回最终结果,然后 multiply 函数的执行上下文也从调用栈中弹出。

任务队列则是一个存储待执行任务的队列。当一个异步任务(例如 setTimeout 回调函数或者 I/O 操作完成后的回调函数)准备好执行时,它会被放入任务队列。事件循环不断检查调用栈,一旦调用栈为空,就会从任务队列中取出一个任务放入调用栈执行。例如:

console.log('Start');
setTimeout(() => {
    console.log('Timeout callback');
}, 1000);
console.log('End');

在这个例子中,首先 console.log('Start') 被执行,然后 setTimeout 被调用。setTimeout 并不会立即执行回调函数,而是将其放入任务队列。接着 console.log('End') 被执行。当调用栈为空时,事件循环从任务队列中取出 setTimeout 的回调函数放入调用栈执行,输出 Timeout callback

异步编程模型

JavaScript 提供了几种异步编程模型来实现并发处理,主要包括回调函数(Callbacks)、Promise 和 async/await。

回调函数

回调函数是最基本的异步编程方式。当一个异步操作完成时,会调用预先定义好的回调函数。例如,在 Node 中读取文件的操作:

const fs = require('fs');

fs.readFile('example.txt', 'utf8', (err, data) => {
    if (err) {
        console.error(err);
        return;
    }
    console.log(data);
});

在这个例子中,fs.readFile 是一个异步操作。它接收文件名、编码格式以及一个回调函数作为参数。当文件读取完成后,会调用回调函数,并将错误对象(如果有错误)和读取到的数据作为参数传递给回调函数。

然而,回调函数存在一个问题,就是当有多个异步操作需要顺序执行时,代码会变得难以阅读和维护,这种情况被称为“回调地狱”(Callback Hell)。例如:

fs.readFile('file1.txt', 'utf8', (err1, data1) => {
    if (err1) {
        console.error(err1);
        return;
    }
    fs.readFile('file2.txt', 'utf8', (err2, data2) => {
        if (err2) {
            console.error(err2);
            return;
        }
        fs.readFile('file3.txt', 'utf8', (err3, data3) => {
            if (err3) {
                console.error(err3);
                return;
            }
            console.log(data1 + data2 + data3);
        });
    });
});

可以看到,随着异步操作的增多,代码的缩进越来越深,可读性变差。

Promise

Promise 是一种更优雅的异步编程方式,它可以解决回调地狱的问题。Promise 表示一个异步操作的最终完成(或失败)及其结果值。一个 Promise 有三种状态:pending(进行中)、fulfilled(已成功)和 rejected(已失败)。

例如,我们可以将前面读取文件的操作封装成一个 Promise:

const fs = require('fs');
const util = require('util');

const readFilePromise = util.promisify(fs.readFile);

readFilePromise('example.txt', 'utf8')
   .then(data => {
        console.log(data);
    })
   .catch(err => {
        console.error(err);
    });

在这个例子中,util.promisify 函数将 fs.readFile 这个基于回调的函数转换成了返回 Promise 的函数。readFilePromise 调用返回一个 Promise,我们可以使用 .then() 方法来处理成功的情况,使用 .catch() 方法来处理失败的情况。

当需要顺序执行多个异步操作时,Promise 的链式调用使得代码更加清晰:

readFilePromise('file1.txt', 'utf8')
   .then(data1 => {
        return readFilePromise('file2.txt', 'utf8').then(data2 => {
            return readFilePromise('file3.txt', 'utf8').then(data3 => {
                return data1 + data2 + data3;
            });
        });
    })
   .then(finalData => {
        console.log(finalData);
    })
   .catch(err => {
        console.error(err);
    });

虽然链式调用比回调地狱有所改善,但多层嵌套仍然不够直观。

async/await

async/await 是基于 Promise 的语法糖,它让异步代码看起来更像同步代码,进一步提高了代码的可读性。async 函数总是返回一个 Promise。在 async 函数内部,可以使用 await 关键字暂停函数的执行,直到 Promise 被解决(resolved)或被拒绝(rejected)。

例如,使用 async/await 重写前面顺序读取文件的代码:

async function readFilesSequentially() {
    try {
        let data1 = await readFilePromise('file1.txt', 'utf8');
        let data2 = await readFilePromise('file2.txt', 'utf8');
        let data3 = await readFilePromise('file3.txt', 'utf8');
        console.log(data1 + data2 + data3);
    } catch (err) {
        console.error(err);
    }
}

readFilesSequentially();

在这个例子中,await 使得代码等待每个文件读取操作完成后再继续执行下一个操作。try...catch 块用于捕获和处理可能出现的错误。这样的代码结构与同步代码非常相似,大大提高了可读性。

Node 中的并发处理场景

Node.js 作为一个基于 Chrome V8 引擎的 JavaScript 运行时,特别适合处理 I/O 密集型任务,这得益于其单线程异步 I/O 模型。在 Node 中,有许多场景需要进行并发处理。

I/O 操作并发

I/O 操作(如文件读写、网络请求等)通常是比较耗时的。在 Node 中,通过异步 I/O 操作和并发处理,可以显著提高应用程序的性能。

文件读写并发

假设我们有多个文件需要读取,并且这些文件之间没有依赖关系,我们可以并发读取这些文件以提高效率。使用 Promise.all 可以很方便地实现这一点:

const fs = require('fs');
const util = require('util');

const readFilePromise = util.promisify(fs.readFile);

const fileNames = ['file1.txt', 'file2.txt', 'file3.txt'];

const promises = fileNames.map(fileName => readFilePromise(fileName, 'utf8'));

Promise.all(promises)
   .then(dataArray => {
        let combinedData = '';
        dataArray.forEach(data => {
            combinedData += data;
        });
        console.log(combinedData);
    })
   .catch(err => {
        console.error(err);
    });

在这个例子中,map 方法将每个文件名映射为一个读取文件的 Promise,形成一个 Promise 数组。Promise.all 接收这个 Promise 数组,并返回一个新的 Promise。当所有的 Promise 都被解决(resolved)时,新的 Promise 才会被解决,并将所有 Promise 的结果以数组的形式传递给 .then() 回调函数。如果其中任何一个 Promise 被拒绝(rejected),整个 Promise.all 就会被拒绝。

网络请求并发

在 Node 中进行网络请求(例如使用 httpaxios 库)时,也经常需要并发处理。以 axios 为例:

const axios = require('axios');

const urls = ['https://example.com/api1', 'https://example.com/api2', 'https://example.com/api3'];

const requests = urls.map(url => axios.get(url));

Promise.all(requests)
   .then(responses => {
        responses.forEach(response => {
            console.log(response.data);
        });
    })
   .catch(err => {
        console.error(err);
    });

这里同样使用 map 方法创建一个包含多个网络请求的 Promise 数组,然后通过 Promise.all 并发执行这些请求,并在所有请求完成后处理响应。

处理大量任务

在一些应用场景中,可能需要处理大量的任务,例如批量数据处理。如果逐个顺序处理这些任务,会花费很长时间。通过并发处理,可以将任务分配到不同的时间段执行,提高整体效率。

假设我们有一个数组,需要对每个元素进行一些复杂的计算,并且希望并发处理这些计算:

function complexCalculation(num) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(num * num);
        }, 1000);
    });
}

const numbers = [1, 2, 3, 4, 5];

const tasks = numbers.map(num => complexCalculation(num));

Promise.all(tasks)
   .then(results => {
        console.log(results);
    })
   .catch(err => {
        console.error(err);
    });

在这个例子中,complexCalculation 函数模拟了一个耗时的计算任务,返回一个 Promise。map 方法将数组中的每个数字映射为一个执行计算的 Promise,Promise.all 用于并发执行这些任务并获取结果。

控制并发度

虽然并发处理可以提高效率,但如果并发的任务过多,可能会导致系统资源耗尽,例如内存不足或网络拥塞。因此,需要控制并发度,即同时执行的任务数量。

使用队列和 Promise

一种常见的控制并发度的方法是使用队列和 Promise。我们可以创建一个任务队列,每次从队列中取出一定数量的任务并发执行,当一个任务完成后,再从队列中取出一个新的任务执行,以保持并发度的稳定。

class TaskQueue {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.tasks = [];
        this.running = 0;
    }

    addTask(task) {
        return new Promise((resolve, reject) => {
            this.tasks.push({ task, resolve, reject });
            this._processQueue();
        });
    }

    _processQueue() {
        while (this.running < this.concurrency && this.tasks.length > 0) {
            const { task, resolve, reject } = this.tasks.shift();
            this.running++;
            task()
               .then(result => {
                    resolve(result);
                })
               .catch(error => {
                    reject(error);
                })
               .finally(() => {
                    this.running--;
                    this._processQueue();
                });
        }
    }
}

// 使用示例
const taskQueue = new TaskQueue(2);

function task1() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task 1 completed');
            resolve();
        }, 2000);
    });
}

function task2() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task 2 completed');
            resolve();
        }, 1000);
    });
}

function task3() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task 3 completed');
            resolve();
        }, 3000);
    });
}

taskQueue.addTask(task1);
taskQueue.addTask(task2);
taskQueue.addTask(task3);

在这个例子中,TaskQueue 类实现了一个任务队列。concurrency 属性表示最大并发度,tasks 数组存储待执行的任务,running 变量记录当前正在执行的任务数量。addTask 方法将任务添加到队列中,并返回一个 Promise。_processQueue 方法负责从队列中取出任务并执行,当一个任务完成后,更新 running 变量并继续处理队列中的下一个任务。

使用 async/await 和 for...of 循环

另一种方式是使用 async/await 和 for...of 循环来控制并发度。我们可以使用 Promise.race 来实现类似的效果。

async function limitConcurrency(tasks, concurrency) {
    let results = [];
    let running = 0;
    const taskQueue = tasks[Symbol.iterator]();

    async function runTask() {
        if (running >= concurrency) {
            await Promise.race(tasks.filter(t => t!== null));
        }
        running++;
        let task = taskQueue.next();
        if (task.done) {
            running--;
            return;
        }
        try {
            let result = await task.value();
            results.push(result);
        } catch (err) {
            console.error(err);
        } finally {
            running--;
            runTask();
        }
    }

    for (let i = 0; i < concurrency; i++) {
        runTask();
    }

    while (running > 0) {
        await Promise.race(tasks.filter(t => t!== null));
    }

    return results;
}

// 使用示例
function task1() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task 1 completed');
            resolve('Task 1 result');
        }, 2000);
    });
}

function task2() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task 2 completed');
            resolve('Task 2 result');
        }, 1000);
    });
}

function task3() {
    return new Promise((resolve) => {
        setTimeout(() => {
            console.log('Task 3 completed');
            resolve('Task 3 result');
        }, 3000);
    });
}

const tasks = [task1, task2, task3];

limitConcurrency(tasks, 2).then(results => {
    console.log(results);
});

在这个例子中,limitConcurrency 函数接收任务数组和最大并发度作为参数。runTask 函数负责从任务队列中取出任务并执行,通过 Promise.race 来等待当前正在执行的任务中最快完成的一个,以控制并发度。for...of 循环启动初始的并发任务,最后通过一个 while 循环等待所有任务完成并返回结果。

并发处理中的错误处理

在并发处理中,错误处理至关重要。不同的异步编程模型有不同的错误处理方式。

Promise 的错误处理

在使用 Promise 进行并发处理时,.catch() 方法可以捕获整个 Promise 链中的错误。例如:

const fs = require('fs');
const util = require('util');

const readFilePromise = util.promisify(fs.readFile);

const fileNames = ['file1.txt', 'nonexistentfile.txt', 'file3.txt'];

const promises = fileNames.map(fileName => readFilePromise(fileName, 'utf8'));

Promise.all(promises)
   .then(dataArray => {
        let combinedData = '';
        dataArray.forEach(data => {
            combinedData += data;
        });
        console.log(combinedData);
    })
   .catch(err => {
        console.error('An error occurred:', err);
    });

在这个例子中,如果 nonexistentfile.txt 不存在,readFilePromise 会返回一个被拒绝的 Promise,Promise.all 会捕获这个错误并传递给 .catch() 回调函数。

async/await 的错误处理

在 async/await 中,使用 try...catch 块来捕获错误。例如:

async function readFiles() {
    try {
        let data1 = await readFilePromise('file1.txt', 'utf8');
        let data2 = await readFilePromise('nonexistentfile.txt', 'utf8');
        let data3 = await readFilePromise('file3.txt', 'utf8');
        console.log(data1 + data2 + data3);
    } catch (err) {
        console.error('An error occurred:', err);
    }
}

readFiles();

这里,await 表达式如果抛出错误,会被 try...catch 块捕获并处理。

并发与性能优化

合理的并发处理可以显著提高应用程序的性能,但也需要注意一些性能优化的要点。

减少不必要的并发

虽然并发可以提高效率,但并非所有任务都适合并发处理。如果任务之间存在紧密的依赖关系,或者任务本身执行时间非常短,并发处理可能会引入额外的开销,反而降低性能。例如,一些简单的计算任务,逐个顺序执行可能比并发执行更高效。

优化 I/O 操作

I/O 操作通常是性能瓶颈,因此优化 I/O 操作对于提高并发性能至关重要。可以采用以下几种方法:

  • 缓存:对于频繁读取的文件或网络数据,可以使用缓存机制,避免重复的 I/O 操作。例如,在 Node 中可以使用内存缓存(如 node-cache 库)来缓存文件内容或网络请求结果。
  • 批量操作:尽量将多个小的 I/O 操作合并成一个大的操作。例如,在写入文件时,可以将多个小的数据块合并成一个大的数据块再写入,减少文件系统的 I/O 次数。

监控和调优

在应用程序运行过程中,通过监控工具(如 Node.js 内置的 cluster 模块、node - inspector 等)来观察并发任务的执行情况,包括 CPU 和内存的使用情况、任务的执行时间等。根据监控结果,对并发度、任务分配等进行调优,以达到最佳的性能表现。

通过深入理解 JavaScript 在 Node 编程中的并发处理机制,合理运用异步编程模型、控制并发度、妥善处理错误以及进行性能优化,可以开发出高效、稳定的 Node.js 应用程序。无论是处理 I/O 密集型任务还是大量的计算任务,并发处理都能为应用程序带来显著的性能提升。