MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Node.js中的流Stream工作原理及应用

2023-01-206.2k 阅读

Node.js 中的流 Stream 工作原理及应用

流的基本概念

在 Node.js 环境中,流(Stream)是一种用于处理流数据的抽象接口。流可以被看作是一个有序的、随时间推移而生成或消耗的数据片段序列。它提供了一种高效、内存友好的方式来处理大量数据,而不需要一次性将所有数据加载到内存中。

与传统的一次性读取整个文件或处理大块数据的方式不同,流允许我们逐块地处理数据。这在处理大文件、网络通信等场景中具有显著的优势,因为它可以减少内存的占用,提高应用程序的性能和稳定性。

流的类型

在 Node.js 中,主要有四种类型的流:

  1. 可读流(Readable Streams):用于从数据源读取数据,比如从文件系统读取文件内容、从网络接收数据等。可读流是数据的生产者,它会不断地产生数据片段供消费。
  2. 可写流(Writable Streams):用于向目的地写入数据,例如将数据写入文件、通过网络发送数据等。可写流是数据的消费者,它接收并处理从可读流或其他数据源传来的数据。
  3. 双工流(Duplex Streams):同时具备可读流和可写流的功能。它既可以作为数据的生产者,也可以作为数据的消费者。例如,在网络套接字(TCP Socket)中,数据既可以被发送(可写),也可以被接收(可读)。
  4. 转换流(Transform Streams):是双工流的一种特殊类型,它在数据从可读端流向可写端的过程中对数据进行转换。比如,对数据进行压缩、加密等操作。

可读流(Readable Streams)

  1. 工作原理 可读流在内部维护一个缓冲区,用于暂存从数据源读取的数据。当数据被读取到缓冲区后,可读流会通过事件通知应用程序有新数据可供处理。应用程序可以通过监听这些事件来读取和处理数据。

可读流有两种模式:暂停模式(paused mode)和流动模式(flowing mode)。

  • 暂停模式:在暂停模式下,可读流不会自动将数据推送给应用程序,需要应用程序手动调用 read() 方法来从缓冲区读取数据。这种模式适用于需要精确控制数据读取时机的场景。
  • 流动模式:在流动模式下,可读流会自动将数据推送给应用程序,应用程序通过监听 data 事件来接收数据。如果没有监听 data 事件,数据会被丢弃。这种模式适用于简单的数据处理场景,数据会持续流动直到被处理完或发生错误。
  1. 创建可读流 在 Node.js 中,可以使用 fs.createReadStream() 方法创建一个可读流来读取文件。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');

上述代码创建了一个用于读取 example.txt 文件的可读流。

  1. 事件监听 可读流提供了多个事件,常用的事件有:
  • data:当有新数据可读时触发,在流动模式下,每次触发该事件时,data 参数即为读取到的数据块。
readableStream.on('data', (chunk) => {
  console.log('Received data:', chunk.toString());
});
  • end:当所有数据都被读取完毕,并且缓冲区已清空时触发。
readableStream.on('end', () => {
  console.log('All data has been read.');
});
  • error:当读取过程中发生错误时触发。
readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});
  1. 手动读取数据(暂停模式) 在暂停模式下,可以通过 read() 方法手动读取数据。例如:
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.pause();

let data = '';
function readData() {
  let chunk;
  while (null!== (chunk = readableStream.read(1024))) {
    data += chunk.toString();
  }
  if (data) {
    console.log('Read data:', data);
  }
  if (readableStream.readableEnded) {
    console.log('All data has been read.');
  } else {
    process.nextTick(readData);
  }
}

readableStream.on('readable', readData);
readableStream.on('end', () => {
  console.log('All data has been read (end event).');
});
readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

上述代码中,首先将可读流设置为暂停模式,然后通过 read() 方法每次读取 1024 字节的数据,直到没有数据可读为止。当缓冲区有新数据可读时,readable 事件会被触发,从而调用 readData 函数进行数据读取。

可写流(Writable Streams)

  1. 工作原理 可写流用于将数据写入到目标位置,如文件、网络连接等。它内部也有一个缓冲区,用于暂存等待写入的数据。当数据被写入缓冲区后,可写流会尽快将缓冲区中的数据写入到目标位置。如果缓冲区已满,而目标位置还不能立即接收更多数据时,可写流会处于背压(backpressure)状态,此时应用程序需要暂停向可写流写入数据,直到可写流再次准备好接收数据。

  2. 创建可写流 可以使用 fs.createWriteStream() 方法创建一个可写流来写入文件。例如:

const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');

上述代码创建了一个用于写入 output.txt 文件的可写流。

  1. 写入数据 使用 write() 方法向可写流写入数据。例如:
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
const data = 'Hello, World!';
const writeResult = writableStream.write(data);
if (!writeResult) {
  console.log('Buffer is full, need to pause writing.');
}

write() 方法返回一个布尔值,表示数据是否成功写入缓冲区。如果返回 false,说明缓冲区已满,需要暂停写入数据,直到 drain 事件触发。

  1. 事件监听 可写流常用的事件有:
  • drain:当缓冲区中的数据被成功写入目标位置,且缓冲区再次有空间接收新数据时触发。在 write() 方法返回 false 后,需要监听该事件,以便继续写入数据。
writableStream.on('drain', () => {
  console.log('Buffer has been drained, can resume writing.');
});
  • finish:当所有数据都已被写入并确认写入完成后触发。
writableStream.end();
writableStream.on('finish', () => {
  console.log('All data has been written.');
});
  • error:当写入过程中发生错误时触发。
writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});

双工流(Duplex Streams)

  1. 工作原理 双工流结合了可读流和可写流的功能,允许数据在两个方向上流动。例如,在网络套接字中,数据可以从客户端发送到服务器(可写操作),同时也可以从服务器接收数据(可读操作)。

  2. 创建双工流 Node.js 提供了 net.Socket 类作为双工流的一个典型示例。例如,创建一个 TCP 套接字:

const net = require('net');
const socket = new net.Socket();

这里的 socket 就是一个双工流,可以同时进行读取和写入操作。

  1. 使用双工流 在使用双工流时,可以像使用可读流和可写流一样监听事件和进行读写操作。例如,通过套接字发送和接收数据:
const net = require('net');
const socket = new net.Socket();

socket.connect(8080, 'localhost', () => {
  socket.write('Hello, Server!');
});

socket.on('data', (chunk) => {
  console.log('Received from server:', chunk.toString());
});

socket.on('end', () => {
  console.log('Connection closed by server.');
});

socket.on('error', (err) => {
  console.error('Socket error:', err);
});

上述代码中,通过 socket.write() 方法向服务器发送数据,同时通过监听 data 事件接收服务器返回的数据。

转换流(Transform Streams)

  1. 工作原理 转换流是双工流的一种特殊类型,它在数据从可读端流向可写端的过程中对数据进行转换。例如,在压缩文件时,可以使用转换流对文件内容进行压缩处理,然后将压缩后的数据写入到目标文件。

  2. 创建转换流 可以通过继承 stream.Transform 类来创建自定义的转换流。例如,创建一个简单的将字符串转换为大写的转换流:

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  constructor() {
    super();
  }
  _transform(chunk, encoding, callback) {
    const upperChunk = chunk.toString().toUpperCase();
    this.push(upperChunk);
    callback();
  }
}

在上述代码中,通过重写 _transform 方法,将接收到的数据块转换为大写,并通过 push() 方法将转换后的数据推送到可写端。

  1. 使用转换流 可以将转换流连接到可读流和可写流之间,实现数据的转换和传输。例如:
const fs = require('fs');
const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  constructor() {
    super();
  }
  _transform(chunk, encoding, callback) {
    const upperChunk = chunk.toString().toUpperCase();
    this.push(upperChunk);
    callback();
  }
}

const readableStream = fs.createReadStream('input.txt');
const transformStream = new UppercaseTransform();
const writableStream = fs.createWriteStream('output.txt');

readableStream.pipe(transformStream).pipe(writableStream);

上述代码中,从 input.txt 文件读取的数据经过 UppercaseTransform 转换流转换为大写后,写入到 output.txt 文件中。

流的管道(Pipe)

  1. 管道的概念 管道是 Node.js 流的一个重要特性,它提供了一种简洁的方式来连接可读流、可写流(包括转换流),使得数据能够自动地从可读流流向可写流。通过管道,数据在流动过程中可以被转换流进行处理,从而实现复杂的数据处理流程。

  2. 使用管道 使用 pipe() 方法可以轻松地建立管道连接。例如:

const fs = require('fs');
const zlib = require('zlib');

const readableStream = fs.createReadStream('largeFile.txt');
const gzipStream = zlib.createGzip();
const writableStream = fs.createWriteStream('largeFile.gz');

readableStream.pipe(gzipStream).pipe(writableStream);

在上述代码中,readableStream 读取 largeFile.txt 的内容,通过 gzipStream(一个转换流,用于压缩数据)进行压缩,然后将压缩后的数据写入到 largeFile.gz 文件中。

  1. 管道的优点
  • 自动处理背压:当可写流处理速度较慢,导致缓冲区满时,管道会自动暂停可读流的读取,避免数据丢失。当可写流缓冲区有空间时,可读流会自动恢复读取。
  • 简洁的代码:通过管道,可以用较少的代码实现复杂的数据处理流程,提高代码的可读性和可维护性。

实际应用场景

  1. 文件处理 在处理大文件时,使用流可以避免一次性将整个文件加载到内存中,提高系统性能。例如,将一个大的文本文件进行逐行处理:
const fs = require('fs');
const readableStream = fs.createReadStream('largeTextFile.txt', { encoding: 'utf8' });
let line = '';
readableStream.on('data', (chunk) => {
  const lines = (line + chunk).split('\n');
  line = lines.pop();
  lines.forEach((l) => {
    // 处理每一行数据
    console.log('Processing line:', l);
  });
});
readableStream.on('end', () => {
  if (line) {
    // 处理最后一行数据
    console.log('Processing last line:', line);
  }
});
  1. 网络通信 在网络编程中,流被广泛应用于处理网络数据的收发。例如,使用 HTTP 服务器接收和处理客户端请求:
const http = require('http');
const server = http.createServer((req, res) => {
  let data = '';
  req.on('data', (chunk) => {
    data += chunk.toString();
  });
  req.on('end', () => {
    // 处理接收到的请求数据
    console.log('Received request data:', data);
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end('Hello, Client!');
  });
});
server.listen(3000, () => {
  console.log('Server is running on port 3000.');
});
  1. 数据转换与处理 在数据处理过程中,转换流可以对数据进行各种转换操作。比如,在处理日志文件时,可能需要对日志数据进行格式化、过滤等操作:
const fs = require('fs');
const { Transform } = require('stream');

class LogFilterTransform extends Transform {
  constructor() {
    super();
  }
  _transform(chunk, encoding, callback) {
    const logLine = chunk.toString();
    if (logLine.includes('ERROR')) {
      this.push(logLine);
    }
    callback();
  }
}

const readableStream = fs.createReadStream('logFile.txt', { encoding: 'utf8' });
const transformStream = new LogFilterTransform();
const writableStream = fs.createWriteStream('errorLogFile.txt');

readableStream.pipe(transformStream).pipe(writableStream);

上述代码中,LogFilterTransform 转换流会过滤掉不包含 ERROR 的日志行,并将包含 ERROR 的日志行写入到 errorLogFile.txt 文件中。

  1. 实时数据处理 在实时数据处理场景中,如处理实时传感器数据或实时日志流,流的高效处理特性尤为重要。例如,模拟实时接收传感器数据并进行简单计算:
const { PassThrough } = require('stream');

// 模拟传感器数据生成
function generateSensorData() {
  const sensorStream = new PassThrough({ objectMode: true });
  setInterval(() => {
    const data = Math.random() * 100;
    sensorStream.write({ value: data });
  }, 1000);
  return sensorStream;
}

const sensorStream = generateSensorData();
let sum = 0;
let count = 0;
sensorStream.on('data', (chunk) => {
  sum += chunk.value;
  count++;
  const average = sum / count;
  console.log(`Current average: ${average}`);
});

上述代码通过 PassThrough 流模拟实时传感器数据的生成,并实时计算接收到数据的平均值。

通过深入理解 Node.js 中流的工作原理和应用场景,并结合各种类型的流和管道操作,开发者可以构建高效、稳定且内存友好的应用程序,特别是在处理大量数据和实时数据的场景中。无论是文件处理、网络通信还是复杂的数据转换任务,流都提供了强大而灵活的解决方案。在实际开发中,合理运用流的特性可以显著提升应用程序的性能和可扩展性。