MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Node.js 使用流 Stream 处理大文件

2021-04-042.2k 阅读

一、Node.js 流 Stream 概述

1.1 什么是流

在 Node.js 中,流(Stream)是一种抽象接口,用于处理 Node.js 中持续的数据流。Node.js 中有四种基本的流类型:可读流(Readable Stream)、可写流(Writable Stream)、双工流(Duplex Stream)和转换流(Transform Stream)。流就像是一个管道,数据可以通过这个管道流动,而不是一次性加载到内存中。这使得流在处理大文件、网络数据传输等场景下非常高效,因为它可以避免一次性将大量数据读入内存,从而减少内存压力。

1.2 流的优势

  1. 内存管理:对于大文件,若一次性读入内存,很可能导致内存溢出。而流是逐块处理数据,每次处理的数据量小,极大地降低了内存的使用压力。
  2. 性能提升:流能够边读边处理数据,无需等待整个文件都读入内存才开始操作,从而提高了数据处理的效率。例如,在处理日志文件时,无需等待日志文件全部加载完毕再进行分析,而是可以实时分析日志数据。
  3. 灵活性:可以方便地将多个流连接起来,形成复杂的数据处理管道。例如,可以将可读流连接到可写流,实现文件的高效复制;也可以将转换流插入到数据处理管道中,对数据进行转换处理。

二、可读流 Readable Stream

2.1 创建可读流

在 Node.js 中,可以通过 fs.createReadStream() 方法来创建一个可读流。以下是一个简单的示例,展示如何创建一个可读流来读取文件:

const fs = require('fs');
const readableStream = fs.createReadStream('largeFile.txt');

在上述代码中,fs.createReadStream() 方法接受一个参数,即要读取的文件路径。默认情况下,可读流处于暂停模式,这意味着它不会自动开始读取数据,直到你调用 resume() 方法或者监听 data 事件。

2.2 可读流的事件

  1. data:当有新的数据块可读时,会触发该事件。事件回调函数会接收到一个 Buffer 对象,代表读取到的数据块。例如:
readableStream.on('data', (chunk) => {
    console.log('Received a chunk of data:', chunk.length, 'bytes');
});
  1. end:当没有更多数据可读时,会触发该事件。这通常意味着文件已经读取完毕。
readableStream.on('end', () => {
    console.log('All data has been read.');
});
  1. error:如果在读取过程中发生错误,会触发该事件。事件回调函数会接收到一个 Error 对象,描述错误信息。
readableStream.on('error', (err) => {
    console.error('Error reading file:', err);
});
  1. pause:当流被暂停时触发该事件。例如,当调用 pause() 方法时,会触发此事件。
readableStream.on('pause', () => {
    console.log('Stream has been paused.');
});
  1. resume:当流从暂停状态恢复时触发该事件。例如,当调用 resume() 方法时,会触发此事件。
readableStream.on('resume', () => {
    console.log('Stream has been resumed.');
});

2.3 可读流的模式

  1. 流动模式:在流动模式下,可读流会自动从底层系统读取数据,并通过 data 事件将数据块传递给应用程序。可以通过监听 data 事件来处理数据。当没有监听器监听 data 事件时,数据将会丢失。可以通过调用 resume() 方法或者监听 data 事件进入流动模式。
  2. 暂停模式:在暂停模式下,可读流不会自动读取数据,需要手动调用 read() 方法来读取数据块。默认情况下,新创建的可读流处于暂停模式。在暂停模式下,即使有 data 事件的监听器,数据也不会自动流动,需要调用 resume() 方法来启动数据流动。

2.4 手动读取数据(暂停模式)

在暂停模式下,可以通过调用 read() 方法手动读取数据。read() 方法接受一个可选参数 size,表示要读取的数据块大小。如果不传递 size 参数,read() 方法会尽可能多地读取数据。以下是一个示例:

const fs = require('fs');
const readableStream = fs.createReadStream('largeFile.txt');

// 手动读取数据
let chunk;
while (null!== (chunk = readableStream.read(1024))) {
    console.log('Read a chunk of data:', chunk.length, 'bytes');
}

// 监听 end 事件
readableStream.on('end', () => {
    console.log('All data has been read.');
});

在上述代码中,通过 while 循环不断调用 read(1024) 方法,每次读取 1024 字节的数据块。当 read() 方法返回 null 时,表示没有更多数据可读。

三、可写流 Writable Stream

3.1 创建可写流

可以使用 fs.createWriteStream() 方法来创建一个可写流,用于将数据写入文件。以下是一个简单的示例:

const fs = require('fs');
const writableStream = fs.createWriteStream('outputFile.txt');

在上述代码中,fs.createWriteStream() 方法接受一个参数,即要写入的文件路径。如果文件不存在,会自动创建该文件;如果文件已存在,会覆盖原有内容。

3.2 可写流的事件

  1. drain:当可写流的内部缓冲区被排空,即可以再次写入数据时,会触发该事件。这通常发生在缓冲区已满,调用 write() 方法返回 false 后,当缓冲区有空间时会触发此事件。例如:
writableStream.on('drain', () => {
    console.log('Buffer has been drained. Can write more data.');
});
  1. finish:当所有数据都已被写入底层系统(例如文件),并且调用了 end() 方法后,会触发该事件。
writableStream.on('finish', () => {
    console.log('All data has been written.');
});
  1. error:如果在写入过程中发生错误,会触发该事件。事件回调函数会接收到一个 Error 对象,描述错误信息。
writableStream.on('error', (err) => {
    console.error('Error writing file:', err);
});

3.3 写入数据

可以通过调用 write() 方法向可写流写入数据。write() 方法接受两个参数:要写入的数据(可以是字符串或 Buffer 对象)和一个可选的编码(当数据为字符串时需要指定编码)。例如:

const fs = require('fs');
const writableStream = fs.createWriteStream('outputFile.txt');

const data = 'This is some data to write.';
const writeResult = writableStream.write(data, 'utf8');
if (!writeResult) {
    console.log('Buffer is full. Cannot write more data immediately.');
}

在上述代码中,调用 write() 方法将字符串数据写入可写流。write() 方法返回一个布尔值,表示数据是否成功写入内部缓冲区。如果返回 false,表示缓冲区已满,需要等待 drain 事件触发后再继续写入。

3.4 结束写入

当完成数据写入后,需要调用 end() 方法来通知可写流没有更多数据要写入了。end() 方法可以接受一个可选参数,用于在结束写入前再写入一些数据。例如:

const fs = require('fs');
const writableStream = fs.createWriteStream('outputFile.txt');

const data = 'This is some data to write.';
writableStream.write(data, 'utf8');

// 结束写入,并在结束前再写入一些数据
writableStream.end('This is the end of the data.', 'utf8');

writableStream.on('finish', () => {
    console.log('All data has been written.');
});

在上述代码中,先调用 write() 方法写入部分数据,然后调用 end() 方法结束写入,并在结束前再写入一些数据。当所有数据都被写入后,会触发 finish 事件。

四、使用流处理大文件

4.1 文件复制

使用可读流和可写流可以高效地实现文件复制功能。以下是一个示例:

const fs = require('fs');

const readableStream = fs.createReadStream('sourceFile.txt');
const writableStream = fs.createWriteStream('destinationFile.txt');

readableStream.on('data', (chunk) => {
    const writeResult = writableStream.write(chunk);
    if (!writeResult) {
        readableStream.pause();
        writableStream.once('drain', () => {
            readableStream.resume();
        });
    }
});

readableStream.on('end', () => {
    writableStream.end();
});

readableStream.on('error', (err) => {
    console.error('Error reading source file:', err);
});

writableStream.on('error', (err) => {
    console.error('Error writing to destination file:', err);
});

在上述代码中,通过监听可读流的 data 事件,每次读取到数据块后就写入可写流。如果可写流的缓冲区已满,write() 方法返回 false,此时暂停可读流,等待可写流的 drain 事件触发后再恢复可读流。当可读流读取完毕,调用可写流的 end() 方法结束写入。

4.2 大文件处理示例

假设我们有一个非常大的文本文件,里面包含了大量的日志信息,每行日志记录了一个事件。我们想要统计文件中包含特定关键字(例如 "error")的行数。以下是使用流来实现该功能的示例:

const fs = require('fs');

const readableStream = fs.createReadStream('largeLogFile.txt', { encoding: 'utf8' });
let errorCount = 0;

readableStream.on('data', (chunk) => {
    const lines = chunk.split('\n');
    for (const line of lines) {
        if (line.includes('error')) {
            errorCount++;
        }
    }
});

readableStream.on('end', () => {
    console.log('Number of lines with "error":', errorCount);
});

readableStream.on('error', (err) => {
    console.error('Error reading file:', err);
});

在上述代码中,通过设置 encoding: 'utf8',可读流会将读取到的数据以 UTF - 8 编码的字符串形式传递。每次接收到数据块后,使用 split('\n') 方法将数据块按行分割,然后逐行检查是否包含关键字 "error",如果包含则增加计数器。当文件读取完毕,输出包含关键字的行数。

五、双工流 Duplex Stream

5.1 什么是双工流

双工流(Duplex Stream)是一种既可以作为可读流又可以作为可写流的流类型。它同时实现了 ReadableWritable 接口,允许数据在两个方向上流动。例如,网络套接字(net.Socket)就是一个双工流,它可以接收数据(可读),也可以发送数据(可写)。

5.2 创建自定义双工流

在 Node.js 中,可以通过继承 stream.Duplex 类来创建自定义双工流。以下是一个简单的示例,展示如何创建一个简单的双工流,它会将写入的数据转换为大写后再读取出来:

const { Duplex } = require('stream');

class UpperCaseStream extends Duplex {
    constructor() {
        super();
    }

    _write(chunk, encoding, callback) {
        const upperCaseChunk = chunk.toString().toUpperCase();
        this.push(upperCaseChunk);
        callback();
    }

    _read(size) {
        // 这里不需要实现复杂逻辑,因为我们在 _write 中已经推送了数据
    }
}

const upperCaseStream = new UpperCaseStream();

process.stdin.pipe(upperCaseStream).pipe(process.stdout);

在上述代码中,我们定义了一个 UpperCaseStream 类,继承自 Duplex。在 _write 方法中,将接收到的数据块转换为大写后,通过 push() 方法将其作为可读数据推送出去。_read 方法在这里可以保持简单,因为我们在 _write 中已经处理了数据。最后,将标准输入(process.stdin)通过这个双工流连接到标准输出(process.stdout),这样输入的内容会被转换为大写后输出。

六、转换流 Transform Stream

6.1 什么是转换流

转换流(Transform Stream)是双工流的一种特殊类型,它在写入数据和读取数据之间对数据进行转换。转换流实现了 _transform_flush 方法。_transform 方法用于处理输入数据并生成输出数据,_flush 方法用于在所有数据都写入后执行一些清理操作。

6.2 创建自定义转换流

以下是一个示例,展示如何创建一个自定义转换流,将输入的字符串数据转换为 Base64 编码:

const { Transform } = require('stream');

class Base64Transform extends Transform {
    constructor() {
        super();
    }

    _transform(chunk, encoding, callback) {
        const base64Chunk = Buffer.from(chunk.toString()).toString('base64');
        this.push(base64Chunk);
        callback();
    }

    _flush(callback) {
        callback();
    }
}

const base64Transform = new Base64Transform();

process.stdin.pipe(base64Transform).pipe(process.stdout);

在上述代码中,我们定义了一个 Base64Transform 类,继承自 Transform。在 _transform 方法中,将接收到的数据块转换为 Base64 编码后,通过 push() 方法将其作为输出数据推送出去。_flush 方法在这里只是简单地调用 callback(),表示清理操作完成。最后,将标准输入通过这个转换流连接到标准输出,这样输入的字符串会被转换为 Base64 编码后输出。

6.3 在大文件处理中使用转换流

假设我们有一个大的文本文件,我们想要将文件中的所有内容转换为 Base64 编码并保存到另一个文件中。以下是使用转换流来实现该功能的示例:

const fs = require('fs');
const { Transform } = require('stream');

class Base64Transform extends Transform {
    constructor() {
        super();
    }

    _transform(chunk, encoding, callback) {
        const base64Chunk = Buffer.from(chunk.toString()).toString('base64');
        this.push(base64Chunk);
        callback();
    }

    _flush(callback) {
        callback();
    }
}

const base64Transform = new Base64Transform();

const readableStream = fs.createReadStream('largeTextFile.txt');
const writableStream = fs.createWriteStream('base64EncodedFile.txt');

readableStream.pipe(base64Transform).pipe(writableStream);

readableStream.on('error', (err) => {
    console.error('Error reading file:', err);
});

writableStream.on('error', (err) => {
    console.error('Error writing file:', err);
});

在上述代码中,通过创建一个 Base64Transform 转换流,将可读流(读取大文本文件)和可写流(写入 Base64 编码后的文件)连接起来。这样,在读取文件数据的过程中,数据会被实时转换为 Base64 编码并写入目标文件,高效地处理了大文件的转换和存储。

七、流的背压处理

7.1 什么是背压

背压(Backpressure)是指在数据处理管道中,当数据生产速度超过数据消费速度时,产生的一种压力现象。例如,在一个由可读流、转换流和可写流组成的数据处理管道中,如果可读流读取数据的速度过快,而可写流写入数据的速度较慢,就会导致数据在管道中堆积,最终可能导致内存溢出等问题。

7.2 背压的处理方法

  1. 暂停和恢复可读流:如前面文件复制示例中所示,当可写流的 write() 方法返回 false 时,表示可写流的缓冲区已满,此时暂停可读流,等待 drain 事件触发后再恢复可读流。这样可以控制可读流的数据读取速度,避免数据过度堆积。
  2. 使用 highWaterMark:在创建可读流和可写流时,可以通过 highWaterMark 参数来设置内部缓冲区的大小。例如:
const readableStream = fs.createReadStream('largeFile.txt', { highWaterMark: 16384 }); // 16KB buffer
const writableStream = fs.createWriteStream('outputFile.txt', { highWaterMark: 16384 }); // 16KB buffer

较小的 highWaterMark 值可以减少缓冲区的大小,从而更早地触发背压处理机制,避免过多的数据在缓冲区中堆积。 3. 转换流的背压处理:对于转换流,同样需要处理背压。在 _transform 方法中,如果输出数据的速度较慢,也可能导致输入数据在转换流的缓冲区中堆积。可以通过控制 push() 方法的调用频率来处理背压。例如,当 push() 方法返回 false 时,表示输出缓冲区已满,此时可以暂停输入数据的处理,直到输出缓冲区有空间。

const { Transform } = require('stream');

class SlowTransform extends Transform {
    constructor() {
        super();
        this.pauseInput = false;
    }

    _transform(chunk, encoding, callback) {
        if (this.pauseInput) {
            // 如果输出缓冲区已满,暂存数据,不处理
            this.buffer.push(chunk);
            return;
        }
        const transformedChunk = chunk.toString().toUpperCase();
        const pushResult = this.push(transformedChunk);
        if (!pushResult) {
            this.pauseInput = true;
        }
        callback();
    }

    _flush(callback) {
        // 处理暂存的数据
        while (this.buffer.length > 0) {
            const chunk = this.buffer.shift();
            const transformedChunk = chunk.toString().toUpperCase();
            const pushResult = this.push(transformedChunk);
            if (!pushResult) {
                break;
            }
        }
        callback();
    }
}

在上述代码中,SlowTransform 转换流通过 pauseInput 变量来控制是否暂停输入数据的处理。当 push() 方法返回 false 时,设置 pauseInputtrue,暂存输入数据。在 _flush 方法中,处理暂存的数据,直到输出缓冲区再次有空间。

通过合理处理背压,可以确保在处理大文件等场景下,数据处理管道的稳定性和高效性,避免因数据堆积导致的性能问题和内存问题。