MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Node.js 可读流 Readable 的基本用法

2023-12-242.5k 阅读

Node.js 可读流 Readable 的基本概念

在 Node.js 中,可读流(Readable Stream)是一种抽象接口,用于从源(如文件系统、网络连接等)读取数据。可读流是一种基于事件驱动的机制,它允许我们以高效且非阻塞的方式处理大量数据。

Node.js 中的流分为可读流、可写流、双向流(Duplex Stream)和转换流(Transform Stream)。可读流专门用于读取数据,比如读取文件内容、从网络套接字接收数据等场景。

可读流的创建通常通过特定的模块方法,例如 fs.createReadStream() 用于从文件系统创建可读流,http.IncomingMessage 则是 HTTP 服务器端接收到请求时的可读流实例,用于读取请求数据。

可读流的两种模式

可读流存在两种模式:暂停模式(paused mode)和流动模式(flowing mode)。这两种模式决定了数据在流中的处理方式。

暂停模式

在暂停模式下,数据不会自动流动。只有当我们显式调用 read() 方法时,数据才会从底层资源读取并返回。这在我们需要精确控制数据读取时机和读取量时非常有用。例如,在处理一些复杂的解析逻辑,需要逐块处理数据,避免一次性读取过多数据导致内存压力过大的场景。

流动模式

在流动模式下,数据会自动从底层资源流向应用程序。当我们为可读流添加 data 事件监听器,或者调用 resume() 方法时,可读流就会进入流动模式。在这种模式下,数据会源源不断地被读取并通过 data 事件传递给我们的回调函数进行处理。这种模式适用于对数据处理速度要求较高,并且能够及时处理接收到的数据的场景。

可读流的重要属性

  1. readable:表示流是否可读。当流处于可读状态时,该属性为 true。在流的生命周期中,这个属性的值会根据流的状态变化而改变,比如在流打开准备读取数据时为 true,当流结束或者出现错误不可读时为 false
  2. readableLength:表示流当前缓冲区中等待被读取的数据字节数。这个属性对于我们了解当前流中已缓存的数据量非常有帮助,特别是在需要根据缓存数据量来调整读取策略的场景下。例如,当缓存数据量达到一定阈值时,可以暂停读取,先处理已缓存的数据,避免内存占用过高。
  3. highWaterMark:这是一个重要的配置属性,它定义了内部缓冲区的大小。当缓冲区中的数据量达到 highWaterMark 时,流的行为可能会发生变化,比如在写入操作中可能会暂停写入,直到缓冲区的数据被消费。在可读流中,它也影响着数据的读取和缓冲策略。

可读流的重要事件

  1. data:当有新的数据可读时,会触发这个事件。在流动模式下,这个事件会频繁触发,每次触发都会传递一个 chunk 参数,即读取到的数据块。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('data', (chunk) => {
    console.log('Received data chunk:', chunk.length);
});
  1. end:当流中没有更多数据可读,并且所有数据都已被处理完毕时,会触发该事件。这通常意味着流的生命周期结束。比如,当我们读取完一个文件的所有内容后,就会触发 end 事件。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('end', () => {
    console.log('All data has been read.');
});
  1. error:当流在读取数据过程中发生错误时,会触发这个事件。错误可能是由于文件不存在、权限不足、网络故障等原因导致的。我们应该始终为可读流添加 error 事件监听器,以便及时捕获并处理错误,避免程序崩溃。
const fs = require('fs');
const readableStream = fs.createReadStream('nonexistent.txt');
readableStream.on('error', (err) => {
    console.error('Error occurred:', err.message);
});
  1. pause:当流从流动模式切换到暂停模式时,会触发这个事件。这通常发生在我们调用 pause() 方法或者手动控制流的读取节奏时。例如,当缓冲区数据量达到一定程度,为了避免内存溢出,我们可以暂停流,处理完缓冲区数据后再恢复读取。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('pause', () => {
    console.log('Stream has been paused.');
});
// 模拟一段时间后暂停流
setTimeout(() => {
    readableStream.pause();
}, 2000);
  1. resume:当流从暂停模式切换回流动模式时,会触发这个事件。通常是在我们调用 resume() 方法后触发。比如,当我们处理完缓冲区数据,希望继续读取剩余数据时,就可以调用 resume() 方法,此时会触发 resume 事件。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('resume', () => {
    console.log('Stream has been resumed.');
});
// 先暂停流
setTimeout(() => {
    readableStream.pause();
}, 2000);
// 一段时间后恢复流
setTimeout(() => {
    readableStream.resume();
}, 4000);

可读流的重要方法

  1. read([size]):该方法用于从流中读取数据。如果不传递 size 参数,它会尝试读取尽可能多的数据并返回。如果传递了 size 参数,则会尝试读取指定字节数的数据。在暂停模式下,我们需要手动调用这个方法来读取数据。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt', { encoding: 'utf8', highWaterMark: 16 });
let chunk;
while (null!== (chunk = readableStream.read(16))) {
    console.log('Read data chunk:', chunk);
}

在上述代码中,我们每次读取 16 字节的数据,通过 while 循环持续读取,直到 read() 方法返回 null,表示没有更多数据可读。

  1. pause():用于将可读流从流动模式切换到暂停模式。调用该方法后,数据将不再自动流动,需要手动调用 read() 方法来读取数据。这在我们需要临时停止数据流动,进行一些其他处理时非常有用。比如,在处理网络流时,当我们的缓冲区已满,需要暂停接收数据,处理完缓冲区数据后再恢复接收。
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
// 模拟一段时间后暂停流
setTimeout(() => {
    readableStream.pause();
    console.log('Stream has been paused.');
}, 2000);
  1. resume():用于将可读流从暂停模式切换回流动模式。调用该方法后,数据将重新开始自动流动,流会持续触发 data 事件来传递读取到的数据。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
// 先暂停流
setTimeout(() => {
    readableStream.pause();
    console.log('Stream has been paused.');
}, 2000);
// 一段时间后恢复流
setTimeout(() => {
    readableStream.resume();
    console.log('Stream has been resumed.');
}, 4000);
  1. pipe(destination[, options]):这是一个非常强大的方法,它用于将可读流的数据直接导向可写流。它会自动处理数据的流动和背压问题。例如,我们可以将一个文件的可读流直接管道到另一个文件的可写流,实现文件的复制:
const fs = require('fs');
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');
readableStream.pipe(writableStream);

在上述代码中,source.txt 的内容会通过管道直接写入到 destination.txt 中。pipe 方法内部会自动管理数据的读取和写入,以及处理缓冲区满等背压情况,大大简化了我们处理数据流动的代码。

自定义可读流

在某些情况下,我们可能需要创建自定义的可读流,以满足特定的需求。例如,从自定义的数据源(如数据库、内存缓存等)读取数据。要创建自定义可读流,我们需要继承 stream.Readable 类,并实现其 _read() 方法。

const { Readable } = require('stream');

class CustomReadableStream extends Readable {
    constructor(dataArray) {
        super();
        this.dataArray = dataArray;
        this.currentIndex = 0;
    }

    _read(size) {
        while (this.currentIndex < this.dataArray.length) {
            const chunk = this.dataArray[this.currentIndex];
            this.push(chunk);
            this.currentIndex++;
            if (this.push(null)) {
                break;
            }
        }
    }
}

// 使用自定义可读流
const dataArray = ['chunk1', 'chunk2', 'chunk3'];
const customReadable = new CustomReadableStream(dataArray);
customReadable.on('data', (chunk) => {
    console.log('Received custom data chunk:', chunk);
});
customReadable.on('end', () => {
    console.log('All custom data has been read.');
});

在上述代码中,我们创建了一个 CustomReadableStream 类,继承自 stream.Readable。在 _read() 方法中,我们从 dataArray 中依次取出数据块,并通过 push() 方法将数据块推送到流中。当所有数据都被推送完后,我们通过 push(null) 来表示流的结束。

可读流与背压处理

背压(Backpressure)是指在数据流动过程中,由于下游处理速度跟不上上游生产速度,导致数据积压的问题。在可读流的场景中,当我们使用 pipe() 方法将可读流连接到可写流时,如果可写流的写入速度较慢,就可能会出现背压。

Node.js 的可读流和可写流通过一些机制来处理背压。例如,当可写流的缓冲区已满时,它会返回 false 给可读流,通知可读流暂停发送数据。可读流接收到这个信号后,会暂停读取数据,直到可写流再次准备好接收数据。

const fs = require('fs');
const readableStream = fs.createReadStream('largeFile.txt');
const writableStream = fs.createWriteStream('output.txt');

readableStream.on('data', (chunk) => {
    const writeResult = writableStream.write(chunk);
    if (!writeResult) {
        console.log('Write buffer is full, pausing readable stream.');
        readableStream.pause();
    }
});

writableStream.on('drain', () => {
    console.log('Write buffer has drained, resuming readable stream.');
    readableStream.resume();
});

readableStream.on('end', () => {
    writableStream.end();
});

在上述代码中,我们手动处理背压。当 write() 方法返回 false 时,我们暂停可读流。当可写流的 drain 事件触发时,说明缓冲区已排空,我们恢复可读流。这样可以有效地避免数据积压,保证数据的稳定流动。

可读流在实际项目中的应用场景

  1. 文件读取与处理:在文件处理场景中,可读流被广泛应用。比如,我们需要读取一个大文件并进行逐行解析,或者将文件内容进行格式转换等操作。通过可读流,我们可以逐块读取文件内容,避免一次性加载整个大文件到内存中,从而提高程序的性能和稳定性。
const fs = require('fs');
const readableStream = fs.createReadStream('largeCSVFile.csv', { encoding: 'utf8' });
let line = '';
readableStream.on('data', (chunk) => {
    const newLines = chunk.split('\n');
    for (let i = 0; i < newLines.length - 1; i++) {
        line += newLines[i];
        // 处理每一行数据
        console.log('Processing line:', line);
        line = '';
    }
    line += newLines[newLines.length - 1];
});
readableStream.on('end', () => {
    if (line.length > 0) {
        // 处理最后一行数据
        console.log('Processing last line:', line);
    }
});
  1. 网络数据接收:在网络编程中,特别是处理 HTTP 请求、WebSocket 连接等场景,可读流用于接收来自客户端或服务器端的数据。例如,在 HTTP 服务器中,http.IncomingMessage 就是一个可读流,我们可以通过它来读取客户端发送的请求数据。
const http = require('http');
const server = http.createServer((req, res) => {
    let data = '';
    req.on('data', (chunk) => {
        data += chunk.toString();
    });
    req.on('end', () => {
        console.log('Received request data:', data);
        res.end('Data received successfully.');
    });
});
server.listen(3000, () => {
    console.log('Server is running on port 3000.');
});
  1. 数据处理管道:在一些复杂的数据处理流程中,我们可能需要将多个流连接起来形成一个数据处理管道。例如,从文件读取数据,经过一系列转换(如压缩、加密等),最后写入到另一个存储介质。可读流作为管道的起点,为整个处理流程提供数据输入。
const fs = require('fs');
const zlib = require('zlib');
const readableStream = fs.createReadStream('largeFile.txt');
const gzipStream = zlib.createGzip();
const writableStream = fs.createWriteStream('compressedFile.gz');
readableStream.pipe(gzipStream).pipe(writableStream);

在上述代码中,我们将文件的可读流通过管道依次连接到 gzip 转换流和文件的可写流,实现了文件的压缩并保存。

可读流的性能优化

  1. 合理设置 highWaterMarkhighWaterMark 的值直接影响缓冲区的大小。如果设置过小,可能会导致频繁的读取操作,增加系统开销;如果设置过大,可能会占用过多内存。我们需要根据实际应用场景和数据量来合理设置这个值。例如,对于处理大文件读取,并且内存有限的情况下,可以适当减小 highWaterMark 的值,以减少内存占用。
  2. 避免不必要的事件监听:每个事件监听器都会占用一定的资源,特别是在处理大量数据和频繁触发事件的情况下。尽量只添加必要的事件监听器,并且在不需要时及时移除。例如,如果在某个阶段只关心 dataend 事件,就不要添加其他不必要的事件监听器。
  3. 优化数据处理逻辑:在 data 事件的回调函数中,尽量避免复杂的同步操作。如果必须进行复杂处理,可以考虑将其放到异步任务中,比如使用 setImmediate() 或者 process.nextTick() 来将处理逻辑推迟到下一个事件循环执行,避免阻塞事件循环,影响流的正常数据流动。

总结

Node.js 的可读流是一种强大且灵活的工具,用于高效地从各种数据源读取数据。通过理解其基本概念、两种模式、重要属性、事件和方法,我们能够更好地利用可读流来处理数据,无论是在文件操作、网络编程还是复杂的数据处理管道中。同时,合理处理背压问题以及进行性能优化,能够确保我们的应用程序在处理大量数据时保持高效和稳定。希望通过本文的介绍,你对 Node.js 可读流的基本用法有了更深入的理解和掌握,并能够在实际项目中灵活运用。