MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

JavaScript运用Node工作线程的策略

2022-08-302.6k 阅读

一、Node.js 工作线程简介

(一)JavaScript 的单线程模型

JavaScript 是一门单线程语言,在浏览器环境中,这一特性确保了 DOM 操作等任务不会产生竞态条件。因为浏览器需要处理复杂的用户交互,如点击、滚动等,如果采用多线程,多个线程同时修改 DOM 可能会导致不可预测的结果。在 Node.js 环境中,JavaScript 同样运行在单线程上。这意味着所有代码都是顺序执行的,同一时间只能执行一个任务。例如:

console.log('任务 1');
for (let i = 0; i < 1000000000; i++) {
    // 模拟一个耗时的计算任务
}
console.log('任务 2');

在上述代码中,只有当for循环这个耗时任务执行完毕后,才会打印出“任务 2”。这种单线程模型虽然简单,但对于 CPU 密集型任务,会阻塞事件循环,导致整个应用程序无响应。

(二)Node.js 工作线程的出现

为了解决单线程在处理 CPU 密集型任务时的局限性,Node.js 引入了工作线程(Worker Threads)。工作线程允许开发者在 Node.js 应用程序中创建多个线程来执行 JavaScript 代码。这些工作线程与主线程相互独立,它们有自己的事件循环、堆栈和全局对象(globalThis)。工作线程主要用于执行那些会阻塞主线程的 CPU 密集型任务,如数据加密、图像处理、复杂的数学计算等。通过将这些任务分配到工作线程中执行,主线程可以继续处理其他任务,如处理网络请求、响应事件等,从而提高应用程序的整体性能和响应性。

二、Node.js 工作线程基础使用

(一)创建工作线程

在 Node.js 中,创建一个工作线程非常简单。首先,需要引入worker_threads模块。以下是一个简单的示例,展示如何创建一个基本的工作线程:

  1. 主脚本(main.js)
const { Worker } = require('worker_threads');

// 创建一个新的工作线程
const worker = new Worker('./worker.js');

// 监听工作线程的消息事件
worker.on('message', (message) => {
    console.log('从工作线程接收到消息:', message);
});

// 向工作线程发送消息
worker.postMessage('你好,工作线程!');
  1. 工作线程脚本(worker.js)
const { parentPort } = require('worker_threads');

// 监听来自主线程的消息
parentPort.on('message', (message) => {
    console.log('从主线程接收到消息:', message);
    parentPort.postMessage('你好,主线程!');
});

在上述代码中,main.js作为主线程,创建了一个新的工作线程并向其发送消息。worker.js作为工作线程,接收主线程的消息并回复。通过postMessage方法在主线程和工作线程之间进行通信。

(二)传递数据

  1. 基本数据类型传递:传递基本数据类型(如字符串、数字、布尔值等)非常直接。在前面的示例中,已经展示了如何传递字符串。例如:
// main.js
const { Worker } = require('worker_threads');
const worker = new Worker('./worker.js');
worker.postMessage(42); // 传递一个数字
worker.on('message', (message) => {
    console.log('从工作线程接收到消息:', message);
});

// worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (message) => {
    console.log('从主线程接收到消息:', message);
    parentPort.postMessage(message * 2); // 对数字进行简单处理并返回
});
  1. 复杂数据类型传递:对于复杂数据类型(如对象、数组),Node.js 采用结构化克隆算法进行传递。这意味着数据会被深拷贝,在工作线程中对数据的修改不会影响到主线程中的原始数据。例如:
// main.js
const { Worker } = require('worker_threads');
const data = { name: 'John', age: 30 };
const worker = new Worker('./worker.js');
worker.postMessage(data);
worker.on('message', (message) => {
    console.log('从工作线程接收到消息:', message);
});

// worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (message) => {
    console.log('从主线程接收到消息:', message);
    message.age++; // 修改对象属性
    parentPort.postMessage(message);
});

在这个例子中,尽管在工作线程中修改了age属性,但主线程中的原始data对象并不会受到影响。

(三)共享数据

有时候,希望在主线程和工作线程之间共享数据,而不是进行拷贝传递。Node.js 提供了SharedArrayBufferAtomics来实现这一目的。

  1. 使用 SharedArrayBufferSharedArrayBuffer允许在多个线程之间共享一块内存区域。以下是一个简单的示例:
// main.js
const { Worker } = require('worker_threads');
const SharedArrayBuffer = require('worker_threads').SharedArrayBuffer;

// 创建一个 SharedArrayBuffer
const sab = new SharedArrayBuffer(16);
const int32Array = new Int32Array(sab);
int32Array[0] = 42;

const worker = new Worker('./worker.js', {
    workerData: { sab }
});

worker.on('message', (message) => {
    console.log('从工作线程接收到消息:', message);
    console.log('主线程中的数据:', int32Array[0]);
});

// worker.js
const { parentPort, workerData } = require('worker_threads');
const { Atomics } = require('worker_threads');

const sab = workerData.sab;
const int32Array = new Int32Array(sab);

Atomics.add(int32Array, 0, 1);

parentPort.postMessage('数据已修改');

在上述代码中,主线程创建了一个SharedArrayBuffer并传递给工作线程。工作线程使用Atomics对共享数据进行操作,主线程可以看到工作线程对数据的修改。

  1. Atomics 操作Atomics提供了一组原子操作方法,用于在共享内存上进行安全的读写操作。常见的方法有Atomics.addAtomics.storeAtomics.load等。这些方法确保了在多线程环境下对共享数据的操作是原子性的,避免了竞态条件。例如,Atomics.add方法会在指定位置增加一个值,并返回旧值,整个操作是原子的,不会被其他线程打断。

三、JavaScript 运用 Node 工作线程的策略

(一)任务拆分策略

  1. 识别 CPU 密集型任务:在应用程序中,首先要能够识别出哪些任务是 CPU 密集型的。例如,复杂的数学计算、数据加密解密、图像的复杂处理等。以图像的直方图均衡化处理为例,这是一个典型的 CPU 密集型任务,需要对图像的每个像素进行计算。
// 假设这是一个图像数据数组,每个元素代表一个像素值
const imageData = new Uint8Array([10, 20, 30, 40, 50, 60, 70, 80, 90, 100]);

// 直方图均衡化的简单模拟计算
function histogramEqualization(data) {
    // 计算直方图
    const histogram = new Array(256).fill(0);
    for (let i = 0; i < data.length; i++) {
        histogram[data[i]]++;
    }

    // 计算累积分布函数
    const cdf = new Array(256).fill(0);
    cdf[0] = histogram[0];
    for (let i = 1; i < 256; i++) {
        cdf[i] = cdf[i - 1] + histogram[i];
    }

    // 均衡化图像数据
    const output = new Uint8Array(data.length);
    for (let i = 0; i < data.length; i++) {
        output[i] = Math.floor((cdf[data[i]] / data.length) * 255);
    }
    return output;
}
  1. 合理拆分任务:将识别出的 CPU 密集型任务拆分成多个子任务,然后分配到不同的工作线程中执行。例如,对于一个大规模的矩阵乘法任务,可以将矩阵按行或按列拆分成多个子矩阵,每个工作线程负责计算一部分子矩阵的乘积。假设要计算矩阵A和矩阵B的乘积:
// 矩阵 A
const A = [
    [1, 2],
    [3, 4]
];

// 矩阵 B
const B = [
    [5, 6],
    [7, 8]
];

// 拆分矩阵 A 为子矩阵
const subMatrixA1 = [A[0]];
const subMatrixA2 = [A[1]];

// 工作线程 1 计算子矩阵乘积
function worker1() {
    const result1 = [];
    for (let i = 0; i < subMatrixA1.length; i++) {
        const row = [];
        for (let j = 0; j < B[0].length; j++) {
            let sum = 0;
            for (let k = 0; k < B.length; k++) {
                sum += subMatrixA1[i][k] * B[k][j];
            }
            row.push(sum);
        }
        result1.push(row);
    }
    return result1;
}

// 工作线程 2 计算子矩阵乘积
function worker2() {
    const result2 = [];
    for (let i = 0; i < subMatrixA2.length; i++) {
        const row = [];
        for (let j = 0; j < B[0].length; j++) {
            let sum = 0;
            for (let k = 0; k < B.length; k++) {
                sum += subMatrixA2[i][k] * B[k][j];
            }
            row.push(sum);
        }
        result2.push(row);
    }
    return result2;
}

// 合并结果
const result = worker1().concat(worker2());
console.log(result);

通过这种方式,利用多个工作线程并行计算,提高了矩阵乘法的执行效率。

(二)负载均衡策略

  1. 动态任务分配:为了实现负载均衡,主线程需要动态地将任务分配给空闲的工作线程。可以维护一个工作线程池,并记录每个工作线程的状态(忙碌或空闲)。当有新任务到来时,主线程检查工作线程池,将任务分配给空闲的工作线程。例如:
const { Worker } = require('worker_threads');

// 工作线程池
const workerPool = [];
const numWorkers = 4;

// 创建工作线程池
for (let i = 0; i < numWorkers; i++) {
    const worker = new Worker('./worker.js');
    workerPool.push(worker);
    worker.on('message', (message) => {
        console.log('从工作线程', i, '接收到消息:', message);
    });
    worker.on('exit', () => {
        console.log('工作线程', i, '已退出');
    });
}

// 模拟任务队列
const taskQueue = [1, 2, 3, 4, 5, 6, 7, 8];

// 动态任务分配
function distributeTasks() {
    while (taskQueue.length > 0 && workerPool.some(worker => worker.readyState === 1)) {
        const task = taskQueue.shift();
        const availableWorker = workerPool.find(worker => worker.readyState === 1);
        if (availableWorker) {
            availableWorker.postMessage(task);
        }
    }
}

// 启动任务分配
distributeTasks();

在这个示例中,主线程维护一个工作线程池,并根据工作线程的就绪状态动态地分配任务。

  1. 任务优先级:在实际应用中,可能会有不同优先级的任务。可以为任务设置优先级,主线程优先将高优先级的任务分配给工作线程。例如,可以定义一个任务对象,包含任务数据和优先级字段:
const { Worker } = require('worker_threads');

// 工作线程池
const workerPool = [];
const numWorkers = 4;

// 创建工作线程池
for (let i = 0; i < numWorkers; i++) {
    const worker = new Worker('./worker.js');
    workerPool.push(worker);
    worker.on('message', (message) => {
        console.log('从工作线程', i, '接收到消息:', message);
    });
    worker.on('exit', () => {
        console.log('工作线程', i, '已退出');
    });
}

// 模拟任务队列,包含优先级
const taskQueue = [
    { data: 1, priority: 2 },
    { data: 2, priority: 1 },
    { data: 3, priority: 2 },
    { data: 4, priority: 3 }
];

// 根据优先级对任务队列进行排序
taskQueue.sort((a, b) => b.priority - a.priority);

// 动态任务分配
function distributeTasks() {
    while (taskQueue.length > 0 && workerPool.some(worker => worker.readyState === 1)) {
        const task = taskQueue.shift();
        const availableWorker = workerPool.find(worker => worker.readyState === 1);
        if (availableWorker) {
            availableWorker.postMessage(task.data);
        }
    }
}

// 启动任务分配
distributeTasks();

通过这种方式,高优先级的任务能够更快地得到处理。

(三)错误处理策略

  1. 工作线程内部错误处理:在工作线程中,需要对可能出现的错误进行捕获和处理。可以使用try - catch块来捕获同步代码中的错误,对于异步操作,可以使用Promisecatch方法。例如:
const { parentPort } = require('worker_threads');

parentPort.on('message', (message) => {
    try {
        // 假设这里有一个可能会抛出错误的操作
        if (message === 'error') {
            throw new Error('模拟错误');
        }
        parentPort.postMessage('任务成功完成');
    } catch (error) {
        parentPort.postMessage({ error: error.message });
    }
});
  1. 主线程接收错误:主线程需要监听工作线程发送的错误消息,并进行相应的处理。可以通过监听工作线程的message事件,判断消息是否包含错误信息。例如:
const { Worker } = require('worker_threads');

const worker = new Worker('./worker.js');

worker.on('message', (message) => {
    if ('error' in message) {
        console.error('工作线程发生错误:', message.error);
    } else {
        console.log('工作线程返回结果:', message);
    }
});

worker.postMessage('error');

通过这种方式,主线程能够及时了解工作线程中发生的错误,并采取相应的措施,如重试任务、记录错误日志等。

四、性能优化与注意事项

(一)性能优化

  1. 减少通信开销:主线程和工作线程之间的通信会带来一定的开销,因为数据需要进行序列化和反序列化(除了SharedArrayBuffer)。尽量减少不必要的通信,批量传递数据而不是频繁地发送小数据。例如,在进行多次计算任务时,可以将多个计算任务的数据一次性传递给工作线程,工作线程完成所有计算后再一次性返回结果。
// main.js
const { Worker } = require('worker_threads');

const tasks = [1, 2, 3, 4, 5];

const worker = new Worker('./worker.js');

worker.on('message', (message) => {
    console.log('从工作线程接收到结果:', message);
});

worker.postMessage(tasks);

// worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (tasks) => {
    const results = tasks.map(task => task * 2);
    parentPort.postMessage(results);
});
  1. 合理设置工作线程数量:工作线程数量并非越多越好。过多的工作线程会增加系统的资源开销,如线程的创建、调度等。需要根据服务器的 CPU 核心数和任务的特性来合理设置工作线程数量。一般来说,可以将工作线程数量设置为 CPU 核心数,这样可以充分利用 CPU 资源,同时避免过多线程带来的开销。例如,在多核服务器上,可以通过os.cpus()方法获取 CPU 核心数:
const os = require('os');
const { Worker } = require('worker_threads');

const numCPUs = os.cpus().length;
const workerPool = [];

for (let i = 0; i < numCPUs; i++) {
    const worker = new Worker('./worker.js');
    workerPool.push(worker);
}

(二)注意事项

  1. 全局对象差异:工作线程有自己的全局对象globalThis,它与主线程的global对象有所不同。在工作线程中,没有process对象(只有部分属性可用,如process.env),也没有require方法(需要通过worker_threads模块的workerData传递模块路径来加载模块)。例如,如果要在工作线程中使用第三方模块,需要在主线程中准备好模块并通过workerData传递:
// main.js
const { Worker } = require('worker_threads');
const axios = require('axios');

const worker = new Worker('./worker.js', {
    workerData: { axios }
});

worker.on('message', (message) => {
    console.log('从工作线程接收到消息:', message);
});

// worker.js
const { parentPort, workerData } = require('worker_threads');

const axios = workerData.axios;

parentPort.on('message', async () => {
    try {
        const response = await axios.get('https://example.com');
        parentPort.postMessage(response.data);
    } catch (error) {
        parentPort.postMessage({ error: error.message });
    }
});
  1. 资源共享与竞争:虽然SharedArrayBuffer提供了共享数据的能力,但在多线程环境下对共享资源的访问需要特别小心,以避免竞态条件。始终使用Atomics方法来操作SharedArrayBuffer,确保操作的原子性。另外,要注意工作线程之间可能对其他共享资源(如文件系统、数据库连接等)的竞争,需要采用适当的同步机制(如锁)来保证资源的安全访问。例如,在多个工作线程同时写入同一个文件时,可能会导致数据损坏,可以使用文件锁来确保同一时间只有一个工作线程能够写入文件。

通过合理运用上述策略和注意事项,开发者可以在 Node.js 应用程序中有效地利用工作线程,提高应用程序的性能和响应性,特别是在处理 CPU 密集型任务时。同时,要不断根据实际应用场景进行测试和优化,以达到最佳的性能表现。