MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Node.js 可写流 Writable 的高级应用

2022-12-012.1k 阅读

理解 Node.js 可写流 Writable 的基本原理

在 Node.js 中,可写流(Writable)是一种用于向外部目的地写入数据的流类型。它是流模块的重要组成部分,在处理大量数据输出时非常有用,比如写入文件、发送网络请求数据等场景。

从底层实现来看,Writable 流基于事件驱动机制工作。当数据被写入流时,它会先进入内部缓冲区。如果缓冲区未满,write() 方法会立即返回 true,表示可以继续写入更多数据。当缓冲区已满时,write() 方法会返回 false,此时调用者需要暂停写入数据,直到 drain 事件被触发,表明缓冲区又有空间可以写入数据了。

下面是一个简单的示例,展示如何创建一个基本的可写流并向其写入数据:

const { Writable } = require('stream');

// 创建一个可写流实例
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // chunk 是要写入的数据块
    // encoding 是数据块的编码
    // callback 是写入完成后要调用的回调函数
    console.log('Received chunk:', chunk.toString());
    callback();
  }
});

// 向可写流写入数据
myWritable.write('Hello, ');
myWritable.write('world!');

// 结束写入流
myWritable.end();

在这个示例中,我们创建了一个自定义的可写流 myWritablewrite 方法会在每次接收到数据块时被调用,我们在这个方法中简单地将接收到的数据块打印出来,然后调用 callback 表示写入完成。

Writable 流的事件

drain 事件

drain 事件是 Writable 流中非常重要的事件。当内部缓冲区被排空,即有空间再次写入数据时,会触发该事件。在缓冲区已满,write() 方法返回 false 后,需要监听这个事件,以便知道何时可以继续写入数据。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // 模拟一个缓慢的写入操作
    setTimeout(() => {
      console.log('Received chunk:', chunk.toString());
      callback();
    }, 100);
  }
});

let dataToWrite = 'A'.repeat(100000);
let writeResult = myWritable.write(dataToWrite);

if (!writeResult) {
  console.log('Buffer is full, waiting for drain event...');
  myWritable.once('drain', () => {
    console.log('Buffer has drained, can write more data.');
    myWritable.write('Additional data');
    myWritable.end();
  });
} else {
  myWritable.end();
}

在这个例子中,我们通过 setTimeout 模拟一个缓慢的写入操作,使得缓冲区很快被填满。当 write() 方法返回 false 时,我们监听 drain 事件,当事件触发时,就可以继续写入数据。

finish 事件

finish 事件在调用 end() 方法且所有数据都已被处理完毕时触发。这意味着所有写入流中的数据都已经被完全处理,并且没有更多数据会被写入。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    console.log('Received chunk:', chunk.toString());
    callback();
  }
});

myWritable.write('Part 1');
myWritable.write('Part 2');
myWritable.end('Part 3');

myWritable.on('finish', () => {
  console.log('All data has been written and processed.');
});

在这个示例中,我们向可写流写入了三部分数据,最后调用 end() 方法并传入最后一部分数据。当所有数据都被处理完毕后,finish 事件会被触发。

pipe 相关事件

pipe 方法用于将可读流的数据自动传输到可写流。在这个过程中,会涉及到一些相关事件。

  • pipe 事件:当使用 pipe 方法连接可读流和可写流时触发。
  • unpipe 事件:当使用 unpipe 方法断开可读流和可写流的连接时触发。
const { Readable, Writable } = require('stream');

const readable = new Readable({
  read() {
    this.push('Hello, ');
    this.push('world!');
    this.push(null);
  }
});

const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log('Received chunk:', chunk.toString());
    callback();
  }
});

readable.pipe(writable);

readable.on('pipe', () => {
  console.log('Pipe connection established.');
});

// 假设在某个时候需要断开连接
setTimeout(() => {
  readable.unpipe(writable);
  readable.on('unpipe', () => {
    console.log('Pipe connection disconnected.');
  });
}, 2000);

在这个例子中,我们创建了一个可读流 readable 和一个可写流 writable,并使用 pipe 方法将它们连接起来。pipe 事件在连接建立时触发,unpipe 事件在连接断开时触发。

高级应用场景

日志写入

在应用程序开发中,日志记录是非常重要的。使用 Writable 流可以高效地处理日志写入操作。我们可以创建一个自定义的日志可写流,将日志数据写入文件或者其他存储介质。

const { Writable } = require('stream');
const fs = require('fs');

class Logger extends Writable {
  constructor(options) {
    super(options);
    this.file = fs.createWriteStream('app.log', { flags: 'a' });
  }

  _write(chunk, encoding, callback) {
    const timestamp = new Date().toISOString();
    const logMessage = `${timestamp} ${chunk.toString()}\n`;
    this.file.write(logMessage, (err) => {
      if (err) {
        console.error('Error writing to log file:', err);
      }
      callback(err);
    });
  }

  _final(callback) {
    this.file.end();
    callback();
  }
}

const logger = new Logger();

logger.write('Application started');
logger.write('User logged in');
logger.end();

在这个示例中,我们创建了一个 Logger 类,继承自 Writable 流。在 _write 方法中,我们为每条日志添加时间戳,并写入到 app.log 文件中。_final 方法在流结束时关闭文件。

数据转换与输出

Writable 流可以用于数据转换并输出到目标。例如,将 JSON 数据转换为 CSV 格式并写入文件。

const { Writable } = require('stream');
const fs = require('fs');

class JsonToCsv extends Writable {
  constructor(options) {
    super(options);
    this.csvHeaderWritten = false;
    this.file = fs.createWriteStream('output.csv');
  }

  _write(chunk, encoding, callback) {
    try {
      const data = JSON.parse(chunk.toString());
      if (!this.csvHeaderWritten) {
        this.file.write(Object.keys(data).join(',') + '\n');
        this.csvHeaderWritten = true;
      }
      this.file.write(Object.values(data).join(',') + '\n');
      callback();
    } catch (err) {
      console.error('Error parsing JSON:', err);
      callback(err);
    }
  }

  _final(callback) {
    this.file.end();
    callback();
  }
}

const jsonData = '[{"name":"John","age":30},{"name":"Jane","age":25}]';
const jsonToCsv = new JsonToCsv();
jsonToCsv.write(jsonData);
jsonToCsv.end();

在这个例子中,JsonToCsv 类继承自 Writable 流。在 _write 方法中,我们将接收到的 JSON 数据解析为对象,然后将其转换为 CSV 格式并写入文件。_final 方法在流结束时关闭文件。

网络请求数据发送

在进行网络请求时,有时需要发送大量数据。使用 Writable 流可以将数据分块逐步发送,而不是一次性加载所有数据到内存中。

const { Writable } = require('stream');
const http = require('http');

class HttpDataSender extends Writable {
  constructor(url, options) {
    super(options);
    this.url = url;
    this.request = http.request(url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      }
    }, (response) => {
      console.log('Response status code:', response.statusCode);
      response.on('data', (chunk) => {
        console.log('Received response data:', chunk.toString());
      });
      response.on('end', () => {
        console.log('Response ended');
      });
    });
    this.request.on('error', (err) => {
      console.error('Request error:', err);
    });
  }

  _write(chunk, encoding, callback) {
    this.request.write(chunk);
    callback();
  }

  _final(callback) {
    this.request.end();
    callback();
  }
}

const dataToSend = '{"message":"This is some data to send"}';
const sender = new HttpDataSender('http://localhost:3000/api', {});
sender.write(dataToSend);
sender.end();

在这个示例中,HttpDataSender 类继承自 Writable 流。在 _write 方法中,我们将接收到的数据块写入 HTTP 请求。_final 方法在流结束时结束 HTTP 请求。

优化与性能考虑

缓冲区大小调整

Writable 流的缓冲区大小会影响性能。默认情况下,缓冲区大小是 16384 字节(16KB)。如果处理的数据块较大或者写入速度较快,可以适当增大缓冲区大小,以减少 drain 事件的触发频率。

const { Writable } = require('stream');

const myWritable = new Writable({
  highWaterMark: 32768, // 将缓冲区大小设置为 32KB
  write(chunk, encoding, callback) {
    console.log('Received chunk:', chunk.toString());
    callback();
  }
});

在这个示例中,我们通过 highWaterMark 选项将缓冲区大小设置为 32KB。

背压处理

在处理高流量数据时,背压处理非常关键。当可写流处理数据的速度慢于可读流产生数据的速度时,就会出现背压问题。为了处理背压,需要正确监听 drain 事件,并在 write() 方法返回 false 时暂停可读流。

const { Readable, Writable } = require('stream');

const readable = new Readable({
  read() {
    for (let i = 0; i < 1000; i++) {
      this.push('A'.repeat(1000));
    }
    this.push(null);
  }
});

const writable = new Writable({
  write(chunk, encoding, callback) {
    // 模拟缓慢的写入操作
    setTimeout(() => {
      console.log('Received chunk:', chunk.toString());
      callback();
    }, 100);
  }
});

readable.on('data', (chunk) => {
  const writeResult = writable.write(chunk);
  if (!writeResult) {
    console.log('Buffer is full, pausing readable stream...');
    readable.pause();
    writable.once('drain', () => {
      console.log('Buffer has drained, resuming readable stream...');
      readable.resume();
    });
  }
});

readable.on('end', () => {
  writable.end();
});

在这个例子中,我们模拟了一个高流量的可读流和一个缓慢的可写流。当 write() 方法返回 false 时,我们暂停可读流,直到 drain 事件触发再恢复可读流。

性能测试与分析

为了确保 Writable 流在应用中的性能表现良好,可以使用性能测试工具进行测试。例如,使用 benchmark 模块来比较不同缓冲区大小、不同写入策略下的性能差异。

const Benchmark = require('benchmark');
const { Writable } = require('stream');

const suite = new Benchmark.Suite;

const writable1 = new Writable({
  highWaterMark: 16384,
  write(chunk, encoding, callback) {
    callback();
  }
});

const writable2 = new Writable({
  highWaterMark: 32768,
  write(chunk, encoding, callback) {
    callback();
  }
});

const dataChunk = 'A'.repeat(1000);

suite
 .add('Default buffer size (16KB)', function() {
    writable1.write(dataChunk);
  })
 .add('Increased buffer size (32KB)', function() {
    writable2.write(dataChunk);
  })
  // 每个测试完成时的回调
 .on('cycle', function(event) {
    console.log(String(event.target));
  })
  // 所有测试完成时的回调
 .on('complete', function() {
    console.log('Fastest is'+ this.filter('fastest').map('name'));
  })
  // 运行测试
 .run({ 'async': true });

在这个示例中,我们使用 benchmark 模块来比较默认缓冲区大小(16KB)和增大缓冲区大小(32KB)下的写入性能。通过这种方式,可以找到最适合应用场景的配置。

错误处理

Writable 流的使用过程中,错误处理至关重要。可能会出现各种错误,比如写入文件失败、网络请求错误等。

处理写入错误

当在 _write 方法中发生错误时,需要通过 callback 传递错误信息。

const { Writable } = require('stream');
const fs = require('fs');

class ErrorProneWriter extends Writable {
  constructor(options) {
    super(options);
    this.file = fs.createWriteStream('error_prone_file.txt');
  }

  _write(chunk, encoding, callback) {
    // 模拟一个写入错误
    const shouldError = Math.random() > 0.5;
    if (shouldError) {
      const err = new Error('Simulated write error');
      callback(err);
    } else {
      this.file.write(chunk, callback);
    }
  }

  _final(callback) {
    this.file.end();
    callback();
  }
}

const writer = new ErrorProneWriter();

writer.on('error', (err) => {
  console.error('Write error:', err);
});

writer.write('Some data');
writer.end();

在这个示例中,我们模拟了一个可能会出现写入错误的情况。如果发生错误,通过 callback 传递错误,然后在 error 事件中捕获并处理错误。

处理流结束错误

_final 方法中也可能发生错误,比如关闭文件时出错。同样需要通过 callback 传递错误信息。

const { Writable } = require('stream');
const fs = require('fs');

class FinalErrorWriter extends Writable {
  constructor(options) {
    super(options);
    this.file = fs.createWriteStream('final_error_file.txt');
  }

  _write(chunk, encoding, callback) {
    this.file.write(chunk, callback);
  }

  _final(callback) {
    // 模拟一个流结束时的错误
    const shouldError = Math.random() > 0.5;
    if (shouldError) {
      const err = new Error('Simulated final error');
      callback(err);
    } else {
      this.file.end(callback);
    }
  }
}

const finalWriter = new FinalErrorWriter();

finalWriter.on('error', (err) => {
  console.error('Final error:', err);
});

finalWriter.write('Some data');
finalWriter.end();

在这个例子中,我们模拟了流结束时可能出现的错误,并在 error 事件中进行处理。

与其他流类型的协同工作

与可读流配合使用

Writable 流经常与 Readable 流一起使用,通过 pipe 方法实现数据的高效传输。

const { Readable, Writable } = require('stream');

const readable = new Readable({
  read() {
    this.push('Hello, ');
    this.push('world!');
    this.push(null);
  }
});

const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log('Received chunk:', chunk.toString());
    callback();
  }
});

readable.pipe(writable);

在这个简单的示例中,可读流产生的数据通过 pipe 方法自动传输到可写流。

与转换流配合使用

转换流(Transform)可以在数据从可读流流向可写流的过程中对数据进行转换。Writable 流可以与转换流协同工作。

const { Readable, Writable, Transform } = require('stream');

const readable = new Readable({
  read() {
    this.push('Hello, ');
    this.push('world!');
    this.push(null);
  }
});

const transform = new Transform({
  transform(chunk, encoding, callback) {
    const upperCaseChunk = chunk.toString().toUpperCase();
    this.push(upperCaseChunk);
    callback();
  }
});

const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log('Received chunk:', chunk.toString());
    callback();
  }
});

readable.pipe(transform).pipe(writable);

在这个示例中,可读流的数据先经过转换流,将数据转换为大写,然后再传输到可写流。

总结与最佳实践

  1. 合理设置缓冲区大小:根据应用场景和数据量,适当调整 highWaterMark 选项,以优化性能。
  2. 正确处理背压:在处理高流量数据时,要正确监听 drain 事件,暂停和恢复可读流,以避免数据堆积。
  3. 完善错误处理:在 _write_final 方法中,要通过 callback 传递错误信息,并在 error 事件中处理错误。
  4. 与其他流类型协同工作:充分利用 Readable 流、Transform 流与 Writable 流的配合,实现高效的数据处理和传输。

通过深入理解和应用 Writable 流的高级特性,可以在 Node.js 应用开发中更好地处理数据输出,提高应用的性能和稳定性。无论是日志记录、数据转换还是网络请求数据发送,Writable 流都能提供强大而灵活的解决方案。在实际应用中,要根据具体需求,结合上述最佳实践,充分发挥 Writable 流的优势。