Node.js 使用流 Stream 处理大文件
一、Node.js 流 Stream 概述
1.1 什么是流
在 Node.js 中,流(Stream)是一种抽象接口,用于处理 Node.js 中持续的数据流。Node.js 中有四种基本的流类型:可读流(Readable Stream)、可写流(Writable Stream)、双工流(Duplex Stream)和转换流(Transform Stream)。流就像是一个管道,数据可以通过这个管道流动,而不是一次性加载到内存中。这使得流在处理大文件、网络数据传输等场景下非常高效,因为它可以避免一次性将大量数据读入内存,从而减少内存压力。
1.2 流的优势
- 内存管理:对于大文件,若一次性读入内存,很可能导致内存溢出。而流是逐块处理数据,每次处理的数据量小,极大地降低了内存的使用压力。
- 性能提升:流能够边读边处理数据,无需等待整个文件都读入内存才开始操作,从而提高了数据处理的效率。例如,在处理日志文件时,无需等待日志文件全部加载完毕再进行分析,而是可以实时分析日志数据。
- 灵活性:可以方便地将多个流连接起来,形成复杂的数据处理管道。例如,可以将可读流连接到可写流,实现文件的高效复制;也可以将转换流插入到数据处理管道中,对数据进行转换处理。
二、可读流 Readable Stream
2.1 创建可读流
在 Node.js 中,可以通过 fs.createReadStream()
方法来创建一个可读流。以下是一个简单的示例,展示如何创建一个可读流来读取文件:
const fs = require('fs');
const readableStream = fs.createReadStream('largeFile.txt');
在上述代码中,fs.createReadStream()
方法接受一个参数,即要读取的文件路径。默认情况下,可读流处于暂停模式,这意味着它不会自动开始读取数据,直到你调用 resume()
方法或者监听 data
事件。
2.2 可读流的事件
- data:当有新的数据块可读时,会触发该事件。事件回调函数会接收到一个
Buffer
对象,代表读取到的数据块。例如:
readableStream.on('data', (chunk) => {
console.log('Received a chunk of data:', chunk.length, 'bytes');
});
- end:当没有更多数据可读时,会触发该事件。这通常意味着文件已经读取完毕。
readableStream.on('end', () => {
console.log('All data has been read.');
});
- error:如果在读取过程中发生错误,会触发该事件。事件回调函数会接收到一个
Error
对象,描述错误信息。
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
- pause:当流被暂停时触发该事件。例如,当调用
pause()
方法时,会触发此事件。
readableStream.on('pause', () => {
console.log('Stream has been paused.');
});
- resume:当流从暂停状态恢复时触发该事件。例如,当调用
resume()
方法时,会触发此事件。
readableStream.on('resume', () => {
console.log('Stream has been resumed.');
});
2.3 可读流的模式
- 流动模式:在流动模式下,可读流会自动从底层系统读取数据,并通过
data
事件将数据块传递给应用程序。可以通过监听data
事件来处理数据。当没有监听器监听data
事件时,数据将会丢失。可以通过调用resume()
方法或者监听data
事件进入流动模式。 - 暂停模式:在暂停模式下,可读流不会自动读取数据,需要手动调用
read()
方法来读取数据块。默认情况下,新创建的可读流处于暂停模式。在暂停模式下,即使有data
事件的监听器,数据也不会自动流动,需要调用resume()
方法来启动数据流动。
2.4 手动读取数据(暂停模式)
在暂停模式下,可以通过调用 read()
方法手动读取数据。read()
方法接受一个可选参数 size
,表示要读取的数据块大小。如果不传递 size
参数,read()
方法会尽可能多地读取数据。以下是一个示例:
const fs = require('fs');
const readableStream = fs.createReadStream('largeFile.txt');
// 手动读取数据
let chunk;
while (null!== (chunk = readableStream.read(1024))) {
console.log('Read a chunk of data:', chunk.length, 'bytes');
}
// 监听 end 事件
readableStream.on('end', () => {
console.log('All data has been read.');
});
在上述代码中,通过 while
循环不断调用 read(1024)
方法,每次读取 1024 字节的数据块。当 read()
方法返回 null
时,表示没有更多数据可读。
三、可写流 Writable Stream
3.1 创建可写流
可以使用 fs.createWriteStream()
方法来创建一个可写流,用于将数据写入文件。以下是一个简单的示例:
const fs = require('fs');
const writableStream = fs.createWriteStream('outputFile.txt');
在上述代码中,fs.createWriteStream()
方法接受一个参数,即要写入的文件路径。如果文件不存在,会自动创建该文件;如果文件已存在,会覆盖原有内容。
3.2 可写流的事件
- drain:当可写流的内部缓冲区被排空,即可以再次写入数据时,会触发该事件。这通常发生在缓冲区已满,调用
write()
方法返回false
后,当缓冲区有空间时会触发此事件。例如:
writableStream.on('drain', () => {
console.log('Buffer has been drained. Can write more data.');
});
- finish:当所有数据都已被写入底层系统(例如文件),并且调用了
end()
方法后,会触发该事件。
writableStream.on('finish', () => {
console.log('All data has been written.');
});
- error:如果在写入过程中发生错误,会触发该事件。事件回调函数会接收到一个
Error
对象,描述错误信息。
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
3.3 写入数据
可以通过调用 write()
方法向可写流写入数据。write()
方法接受两个参数:要写入的数据(可以是字符串或 Buffer
对象)和一个可选的编码(当数据为字符串时需要指定编码)。例如:
const fs = require('fs');
const writableStream = fs.createWriteStream('outputFile.txt');
const data = 'This is some data to write.';
const writeResult = writableStream.write(data, 'utf8');
if (!writeResult) {
console.log('Buffer is full. Cannot write more data immediately.');
}
在上述代码中,调用 write()
方法将字符串数据写入可写流。write()
方法返回一个布尔值,表示数据是否成功写入内部缓冲区。如果返回 false
,表示缓冲区已满,需要等待 drain
事件触发后再继续写入。
3.4 结束写入
当完成数据写入后,需要调用 end()
方法来通知可写流没有更多数据要写入了。end()
方法可以接受一个可选参数,用于在结束写入前再写入一些数据。例如:
const fs = require('fs');
const writableStream = fs.createWriteStream('outputFile.txt');
const data = 'This is some data to write.';
writableStream.write(data, 'utf8');
// 结束写入,并在结束前再写入一些数据
writableStream.end('This is the end of the data.', 'utf8');
writableStream.on('finish', () => {
console.log('All data has been written.');
});
在上述代码中,先调用 write()
方法写入部分数据,然后调用 end()
方法结束写入,并在结束前再写入一些数据。当所有数据都被写入后,会触发 finish
事件。
四、使用流处理大文件
4.1 文件复制
使用可读流和可写流可以高效地实现文件复制功能。以下是一个示例:
const fs = require('fs');
const readableStream = fs.createReadStream('sourceFile.txt');
const writableStream = fs.createWriteStream('destinationFile.txt');
readableStream.on('data', (chunk) => {
const writeResult = writableStream.write(chunk);
if (!writeResult) {
readableStream.pause();
writableStream.once('drain', () => {
readableStream.resume();
});
}
});
readableStream.on('end', () => {
writableStream.end();
});
readableStream.on('error', (err) => {
console.error('Error reading source file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing to destination file:', err);
});
在上述代码中,通过监听可读流的 data
事件,每次读取到数据块后就写入可写流。如果可写流的缓冲区已满,write()
方法返回 false
,此时暂停可读流,等待可写流的 drain
事件触发后再恢复可读流。当可读流读取完毕,调用可写流的 end()
方法结束写入。
4.2 大文件处理示例
假设我们有一个非常大的文本文件,里面包含了大量的日志信息,每行日志记录了一个事件。我们想要统计文件中包含特定关键字(例如 "error")的行数。以下是使用流来实现该功能的示例:
const fs = require('fs');
const readableStream = fs.createReadStream('largeLogFile.txt', { encoding: 'utf8' });
let errorCount = 0;
readableStream.on('data', (chunk) => {
const lines = chunk.split('\n');
for (const line of lines) {
if (line.includes('error')) {
errorCount++;
}
}
});
readableStream.on('end', () => {
console.log('Number of lines with "error":', errorCount);
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
在上述代码中,通过设置 encoding: 'utf8'
,可读流会将读取到的数据以 UTF - 8 编码的字符串形式传递。每次接收到数据块后,使用 split('\n')
方法将数据块按行分割,然后逐行检查是否包含关键字 "error",如果包含则增加计数器。当文件读取完毕,输出包含关键字的行数。
五、双工流 Duplex Stream
5.1 什么是双工流
双工流(Duplex Stream)是一种既可以作为可读流又可以作为可写流的流类型。它同时实现了 Readable
和 Writable
接口,允许数据在两个方向上流动。例如,网络套接字(net.Socket
)就是一个双工流,它可以接收数据(可读),也可以发送数据(可写)。
5.2 创建自定义双工流
在 Node.js 中,可以通过继承 stream.Duplex
类来创建自定义双工流。以下是一个简单的示例,展示如何创建一个简单的双工流,它会将写入的数据转换为大写后再读取出来:
const { Duplex } = require('stream');
class UpperCaseStream extends Duplex {
constructor() {
super();
}
_write(chunk, encoding, callback) {
const upperCaseChunk = chunk.toString().toUpperCase();
this.push(upperCaseChunk);
callback();
}
_read(size) {
// 这里不需要实现复杂逻辑,因为我们在 _write 中已经推送了数据
}
}
const upperCaseStream = new UpperCaseStream();
process.stdin.pipe(upperCaseStream).pipe(process.stdout);
在上述代码中,我们定义了一个 UpperCaseStream
类,继承自 Duplex
。在 _write
方法中,将接收到的数据块转换为大写后,通过 push()
方法将其作为可读数据推送出去。_read
方法在这里可以保持简单,因为我们在 _write
中已经处理了数据。最后,将标准输入(process.stdin
)通过这个双工流连接到标准输出(process.stdout
),这样输入的内容会被转换为大写后输出。
六、转换流 Transform Stream
6.1 什么是转换流
转换流(Transform Stream)是双工流的一种特殊类型,它在写入数据和读取数据之间对数据进行转换。转换流实现了 _transform
和 _flush
方法。_transform
方法用于处理输入数据并生成输出数据,_flush
方法用于在所有数据都写入后执行一些清理操作。
6.2 创建自定义转换流
以下是一个示例,展示如何创建一个自定义转换流,将输入的字符串数据转换为 Base64 编码:
const { Transform } = require('stream');
class Base64Transform extends Transform {
constructor() {
super();
}
_transform(chunk, encoding, callback) {
const base64Chunk = Buffer.from(chunk.toString()).toString('base64');
this.push(base64Chunk);
callback();
}
_flush(callback) {
callback();
}
}
const base64Transform = new Base64Transform();
process.stdin.pipe(base64Transform).pipe(process.stdout);
在上述代码中,我们定义了一个 Base64Transform
类,继承自 Transform
。在 _transform
方法中,将接收到的数据块转换为 Base64 编码后,通过 push()
方法将其作为输出数据推送出去。_flush
方法在这里只是简单地调用 callback()
,表示清理操作完成。最后,将标准输入通过这个转换流连接到标准输出,这样输入的字符串会被转换为 Base64 编码后输出。
6.3 在大文件处理中使用转换流
假设我们有一个大的文本文件,我们想要将文件中的所有内容转换为 Base64 编码并保存到另一个文件中。以下是使用转换流来实现该功能的示例:
const fs = require('fs');
const { Transform } = require('stream');
class Base64Transform extends Transform {
constructor() {
super();
}
_transform(chunk, encoding, callback) {
const base64Chunk = Buffer.from(chunk.toString()).toString('base64');
this.push(base64Chunk);
callback();
}
_flush(callback) {
callback();
}
}
const base64Transform = new Base64Transform();
const readableStream = fs.createReadStream('largeTextFile.txt');
const writableStream = fs.createWriteStream('base64EncodedFile.txt');
readableStream.pipe(base64Transform).pipe(writableStream);
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
在上述代码中,通过创建一个 Base64Transform
转换流,将可读流(读取大文本文件)和可写流(写入 Base64 编码后的文件)连接起来。这样,在读取文件数据的过程中,数据会被实时转换为 Base64 编码并写入目标文件,高效地处理了大文件的转换和存储。
七、流的背压处理
7.1 什么是背压
背压(Backpressure)是指在数据处理管道中,当数据生产速度超过数据消费速度时,产生的一种压力现象。例如,在一个由可读流、转换流和可写流组成的数据处理管道中,如果可读流读取数据的速度过快,而可写流写入数据的速度较慢,就会导致数据在管道中堆积,最终可能导致内存溢出等问题。
7.2 背压的处理方法
- 暂停和恢复可读流:如前面文件复制示例中所示,当可写流的
write()
方法返回false
时,表示可写流的缓冲区已满,此时暂停可读流,等待drain
事件触发后再恢复可读流。这样可以控制可读流的数据读取速度,避免数据过度堆积。 - 使用
highWaterMark
:在创建可读流和可写流时,可以通过highWaterMark
参数来设置内部缓冲区的大小。例如:
const readableStream = fs.createReadStream('largeFile.txt', { highWaterMark: 16384 }); // 16KB buffer
const writableStream = fs.createWriteStream('outputFile.txt', { highWaterMark: 16384 }); // 16KB buffer
较小的 highWaterMark
值可以减少缓冲区的大小,从而更早地触发背压处理机制,避免过多的数据在缓冲区中堆积。
3. 转换流的背压处理:对于转换流,同样需要处理背压。在 _transform
方法中,如果输出数据的速度较慢,也可能导致输入数据在转换流的缓冲区中堆积。可以通过控制 push()
方法的调用频率来处理背压。例如,当 push()
方法返回 false
时,表示输出缓冲区已满,此时可以暂停输入数据的处理,直到输出缓冲区有空间。
const { Transform } = require('stream');
class SlowTransform extends Transform {
constructor() {
super();
this.pauseInput = false;
}
_transform(chunk, encoding, callback) {
if (this.pauseInput) {
// 如果输出缓冲区已满,暂存数据,不处理
this.buffer.push(chunk);
return;
}
const transformedChunk = chunk.toString().toUpperCase();
const pushResult = this.push(transformedChunk);
if (!pushResult) {
this.pauseInput = true;
}
callback();
}
_flush(callback) {
// 处理暂存的数据
while (this.buffer.length > 0) {
const chunk = this.buffer.shift();
const transformedChunk = chunk.toString().toUpperCase();
const pushResult = this.push(transformedChunk);
if (!pushResult) {
break;
}
}
callback();
}
}
在上述代码中,SlowTransform
转换流通过 pauseInput
变量来控制是否暂停输入数据的处理。当 push()
方法返回 false
时,设置 pauseInput
为 true
,暂存输入数据。在 _flush
方法中,处理暂存的数据,直到输出缓冲区再次有空间。
通过合理处理背压,可以确保在处理大文件等场景下,数据处理管道的稳定性和高效性,避免因数据堆积导致的性能问题和内存问题。