Node.js 可读流 Readable 的基本用法
Node.js 可读流 Readable 的基本概念
在 Node.js 中,可读流(Readable Stream)是一种抽象接口,用于从源(如文件系统、网络连接等)读取数据。可读流是一种基于事件驱动的机制,它允许我们以高效且非阻塞的方式处理大量数据。
Node.js 中的流分为可读流、可写流、双向流(Duplex Stream)和转换流(Transform Stream)。可读流专门用于读取数据,比如读取文件内容、从网络套接字接收数据等场景。
可读流的创建通常通过特定的模块方法,例如 fs.createReadStream()
用于从文件系统创建可读流,http.IncomingMessage
则是 HTTP 服务器端接收到请求时的可读流实例,用于读取请求数据。
可读流的两种模式
可读流存在两种模式:暂停模式(paused mode)和流动模式(flowing mode)。这两种模式决定了数据在流中的处理方式。
暂停模式
在暂停模式下,数据不会自动流动。只有当我们显式调用 read()
方法时,数据才会从底层资源读取并返回。这在我们需要精确控制数据读取时机和读取量时非常有用。例如,在处理一些复杂的解析逻辑,需要逐块处理数据,避免一次性读取过多数据导致内存压力过大的场景。
流动模式
在流动模式下,数据会自动从底层资源流向应用程序。当我们为可读流添加 data
事件监听器,或者调用 resume()
方法时,可读流就会进入流动模式。在这种模式下,数据会源源不断地被读取并通过 data
事件传递给我们的回调函数进行处理。这种模式适用于对数据处理速度要求较高,并且能够及时处理接收到的数据的场景。
可读流的重要属性
readable
:表示流是否可读。当流处于可读状态时,该属性为true
。在流的生命周期中,这个属性的值会根据流的状态变化而改变,比如在流打开准备读取数据时为true
,当流结束或者出现错误不可读时为false
。readableLength
:表示流当前缓冲区中等待被读取的数据字节数。这个属性对于我们了解当前流中已缓存的数据量非常有帮助,特别是在需要根据缓存数据量来调整读取策略的场景下。例如,当缓存数据量达到一定阈值时,可以暂停读取,先处理已缓存的数据,避免内存占用过高。highWaterMark
:这是一个重要的配置属性,它定义了内部缓冲区的大小。当缓冲区中的数据量达到highWaterMark
时,流的行为可能会发生变化,比如在写入操作中可能会暂停写入,直到缓冲区的数据被消费。在可读流中,它也影响着数据的读取和缓冲策略。
可读流的重要事件
data
:当有新的数据可读时,会触发这个事件。在流动模式下,这个事件会频繁触发,每次触发都会传递一个chunk
参数,即读取到的数据块。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('data', (chunk) => {
console.log('Received data chunk:', chunk.length);
});
end
:当流中没有更多数据可读,并且所有数据都已被处理完毕时,会触发该事件。这通常意味着流的生命周期结束。比如,当我们读取完一个文件的所有内容后,就会触发end
事件。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('end', () => {
console.log('All data has been read.');
});
error
:当流在读取数据过程中发生错误时,会触发这个事件。错误可能是由于文件不存在、权限不足、网络故障等原因导致的。我们应该始终为可读流添加error
事件监听器,以便及时捕获并处理错误,避免程序崩溃。
const fs = require('fs');
const readableStream = fs.createReadStream('nonexistent.txt');
readableStream.on('error', (err) => {
console.error('Error occurred:', err.message);
});
pause
:当流从流动模式切换到暂停模式时,会触发这个事件。这通常发生在我们调用pause()
方法或者手动控制流的读取节奏时。例如,当缓冲区数据量达到一定程度,为了避免内存溢出,我们可以暂停流,处理完缓冲区数据后再恢复读取。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('pause', () => {
console.log('Stream has been paused.');
});
// 模拟一段时间后暂停流
setTimeout(() => {
readableStream.pause();
}, 2000);
resume
:当流从暂停模式切换回流动模式时,会触发这个事件。通常是在我们调用resume()
方法后触发。比如,当我们处理完缓冲区数据,希望继续读取剩余数据时,就可以调用resume()
方法,此时会触发resume
事件。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('resume', () => {
console.log('Stream has been resumed.');
});
// 先暂停流
setTimeout(() => {
readableStream.pause();
}, 2000);
// 一段时间后恢复流
setTimeout(() => {
readableStream.resume();
}, 4000);
可读流的重要方法
read([size])
:该方法用于从流中读取数据。如果不传递size
参数,它会尝试读取尽可能多的数据并返回。如果传递了size
参数,则会尝试读取指定字节数的数据。在暂停模式下,我们需要手动调用这个方法来读取数据。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt', { encoding: 'utf8', highWaterMark: 16 });
let chunk;
while (null!== (chunk = readableStream.read(16))) {
console.log('Read data chunk:', chunk);
}
在上述代码中,我们每次读取 16 字节的数据,通过 while
循环持续读取,直到 read()
方法返回 null
,表示没有更多数据可读。
pause()
:用于将可读流从流动模式切换到暂停模式。调用该方法后,数据将不再自动流动,需要手动调用read()
方法来读取数据。这在我们需要临时停止数据流动,进行一些其他处理时非常有用。比如,在处理网络流时,当我们的缓冲区已满,需要暂停接收数据,处理完缓冲区数据后再恢复接收。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
// 模拟一段时间后暂停流
setTimeout(() => {
readableStream.pause();
console.log('Stream has been paused.');
}, 2000);
resume()
:用于将可读流从暂停模式切换回流动模式。调用该方法后,数据将重新开始自动流动,流会持续触发data
事件来传递读取到的数据。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
// 先暂停流
setTimeout(() => {
readableStream.pause();
console.log('Stream has been paused.');
}, 2000);
// 一段时间后恢复流
setTimeout(() => {
readableStream.resume();
console.log('Stream has been resumed.');
}, 4000);
pipe(destination[, options])
:这是一个非常强大的方法,它用于将可读流的数据直接导向可写流。它会自动处理数据的流动和背压问题。例如,我们可以将一个文件的可读流直接管道到另一个文件的可写流,实现文件的复制:
const fs = require('fs');
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');
readableStream.pipe(writableStream);
在上述代码中,source.txt
的内容会通过管道直接写入到 destination.txt
中。pipe
方法内部会自动管理数据的读取和写入,以及处理缓冲区满等背压情况,大大简化了我们处理数据流动的代码。
自定义可读流
在某些情况下,我们可能需要创建自定义的可读流,以满足特定的需求。例如,从自定义的数据源(如数据库、内存缓存等)读取数据。要创建自定义可读流,我们需要继承 stream.Readable
类,并实现其 _read()
方法。
const { Readable } = require('stream');
class CustomReadableStream extends Readable {
constructor(dataArray) {
super();
this.dataArray = dataArray;
this.currentIndex = 0;
}
_read(size) {
while (this.currentIndex < this.dataArray.length) {
const chunk = this.dataArray[this.currentIndex];
this.push(chunk);
this.currentIndex++;
if (this.push(null)) {
break;
}
}
}
}
// 使用自定义可读流
const dataArray = ['chunk1', 'chunk2', 'chunk3'];
const customReadable = new CustomReadableStream(dataArray);
customReadable.on('data', (chunk) => {
console.log('Received custom data chunk:', chunk);
});
customReadable.on('end', () => {
console.log('All custom data has been read.');
});
在上述代码中,我们创建了一个 CustomReadableStream
类,继承自 stream.Readable
。在 _read()
方法中,我们从 dataArray
中依次取出数据块,并通过 push()
方法将数据块推送到流中。当所有数据都被推送完后,我们通过 push(null)
来表示流的结束。
可读流与背压处理
背压(Backpressure)是指在数据流动过程中,由于下游处理速度跟不上上游生产速度,导致数据积压的问题。在可读流的场景中,当我们使用 pipe()
方法将可读流连接到可写流时,如果可写流的写入速度较慢,就可能会出现背压。
Node.js 的可读流和可写流通过一些机制来处理背压。例如,当可写流的缓冲区已满时,它会返回 false
给可读流,通知可读流暂停发送数据。可读流接收到这个信号后,会暂停读取数据,直到可写流再次准备好接收数据。
const fs = require('fs');
const readableStream = fs.createReadStream('largeFile.txt');
const writableStream = fs.createWriteStream('output.txt');
readableStream.on('data', (chunk) => {
const writeResult = writableStream.write(chunk);
if (!writeResult) {
console.log('Write buffer is full, pausing readable stream.');
readableStream.pause();
}
});
writableStream.on('drain', () => {
console.log('Write buffer has drained, resuming readable stream.');
readableStream.resume();
});
readableStream.on('end', () => {
writableStream.end();
});
在上述代码中,我们手动处理背压。当 write()
方法返回 false
时,我们暂停可读流。当可写流的 drain
事件触发时,说明缓冲区已排空,我们恢复可读流。这样可以有效地避免数据积压,保证数据的稳定流动。
可读流在实际项目中的应用场景
- 文件读取与处理:在文件处理场景中,可读流被广泛应用。比如,我们需要读取一个大文件并进行逐行解析,或者将文件内容进行格式转换等操作。通过可读流,我们可以逐块读取文件内容,避免一次性加载整个大文件到内存中,从而提高程序的性能和稳定性。
const fs = require('fs');
const readableStream = fs.createReadStream('largeCSVFile.csv', { encoding: 'utf8' });
let line = '';
readableStream.on('data', (chunk) => {
const newLines = chunk.split('\n');
for (let i = 0; i < newLines.length - 1; i++) {
line += newLines[i];
// 处理每一行数据
console.log('Processing line:', line);
line = '';
}
line += newLines[newLines.length - 1];
});
readableStream.on('end', () => {
if (line.length > 0) {
// 处理最后一行数据
console.log('Processing last line:', line);
}
});
- 网络数据接收:在网络编程中,特别是处理 HTTP 请求、WebSocket 连接等场景,可读流用于接收来自客户端或服务器端的数据。例如,在 HTTP 服务器中,
http.IncomingMessage
就是一个可读流,我们可以通过它来读取客户端发送的请求数据。
const http = require('http');
const server = http.createServer((req, res) => {
let data = '';
req.on('data', (chunk) => {
data += chunk.toString();
});
req.on('end', () => {
console.log('Received request data:', data);
res.end('Data received successfully.');
});
});
server.listen(3000, () => {
console.log('Server is running on port 3000.');
});
- 数据处理管道:在一些复杂的数据处理流程中,我们可能需要将多个流连接起来形成一个数据处理管道。例如,从文件读取数据,经过一系列转换(如压缩、加密等),最后写入到另一个存储介质。可读流作为管道的起点,为整个处理流程提供数据输入。
const fs = require('fs');
const zlib = require('zlib');
const readableStream = fs.createReadStream('largeFile.txt');
const gzipStream = zlib.createGzip();
const writableStream = fs.createWriteStream('compressedFile.gz');
readableStream.pipe(gzipStream).pipe(writableStream);
在上述代码中,我们将文件的可读流通过管道依次连接到 gzip
转换流和文件的可写流,实现了文件的压缩并保存。
可读流的性能优化
- 合理设置
highWaterMark
:highWaterMark
的值直接影响缓冲区的大小。如果设置过小,可能会导致频繁的读取操作,增加系统开销;如果设置过大,可能会占用过多内存。我们需要根据实际应用场景和数据量来合理设置这个值。例如,对于处理大文件读取,并且内存有限的情况下,可以适当减小highWaterMark
的值,以减少内存占用。 - 避免不必要的事件监听:每个事件监听器都会占用一定的资源,特别是在处理大量数据和频繁触发事件的情况下。尽量只添加必要的事件监听器,并且在不需要时及时移除。例如,如果在某个阶段只关心
data
和end
事件,就不要添加其他不必要的事件监听器。 - 优化数据处理逻辑:在
data
事件的回调函数中,尽量避免复杂的同步操作。如果必须进行复杂处理,可以考虑将其放到异步任务中,比如使用setImmediate()
或者process.nextTick()
来将处理逻辑推迟到下一个事件循环执行,避免阻塞事件循环,影响流的正常数据流动。
总结
Node.js 的可读流是一种强大且灵活的工具,用于高效地从各种数据源读取数据。通过理解其基本概念、两种模式、重要属性、事件和方法,我们能够更好地利用可读流来处理数据,无论是在文件操作、网络编程还是复杂的数据处理管道中。同时,合理处理背压问题以及进行性能优化,能够确保我们的应用程序在处理大量数据时保持高效和稳定。希望通过本文的介绍,你对 Node.js 可读流的基本用法有了更深入的理解和掌握,并能够在实际项目中灵活运用。