Node.js读写流Stream

创建于 2024年7月23日修改于 2024年7月23日
Node.jsStream

用 Node.js 应用程序处理大型数据集是一把双刃剑。它处理大量数据非常方便,但也很可能遇到性能瓶颈,甚至内存耗尽。一般情况上,开发人员通过一次性将整个数据集读入内存来解决这个问题。这种方法对较小的数据集来说是符合直觉的,但对于大文件来说显得很低效。

这时 Node.js 的流(streams)就派上用场了。流提供了一种完全不同的方法,使你可以逐步处理数据并优化内存使用。通过以可管理的块来处理数据,流使你能够构建可扩展的应用程序,能够高效地处理即使是最艰巨的数据集。正如流行的一句话所说,“流是随时间变化的数组”。在本文中,我们将探讨 Node.js 流的核心概念。你将彻底了解如何利用流进行实时数据处理、内存管理和构建可扩展的应用程序。

在本文中你将学习如何创建和管理可读流和可写流,并处理背压(backpressure)和错误管理。

Contents

什么是 Node.js 流?

Node.js 流为管理应用程序中的数据流提供了强大的抽象。它们在处理大型数据集(如视频或实时传输)方面表现出色而不会影响性能。

这种方法不同于传统的方法,后者会一次性将整个数据集加载到内存中。流按块处理数据,显著减少了内存使用。Node.js 中的所有流都继承自 EventEmitter 类,使它们能够在数据处理的各个阶段发出事件。这些流可以是可读的、可写的或两者兼具,提供了不同数据处理场景的灵活性。

事件驱动架构

Node.js 在事件驱动架构上表现出色,非常适合实时 I/O。这意味着当输入数据可用时立即消费,并在应用程序生成输出时立即发送。流与这种方法无缝集成,允许连续的数据处理。

它们通过在关键阶段发出事件来实现这一点。这些事件包括接收到数据(data 事件)和流完成(end 事件)的信号。开发人员可以监听这些事件并相应地执行自定义逻辑。这种事件驱动的特性使流在处理来自外部源的实时数据方面非常高效。

为什么使用流?

与其他数据处理方法相比,流具有三个关键优势:

这些优势使流成为构建高性能、可扩展的 Node.js 应用程序的强大工具,特别是在处理大型数据集或实时数据处理时。

注意:
如果你的应用程序已经将所有数据都存储在内存中,使用流可能会增加不必要的开销并降低应用程序速度。在这种情况下,传统方法可能更有效。

控制流动:流中的背压 Backpressure

在 Node.js 流中,背压是管理数据流从生产者(数据源)到消费者(数据目的地)的关键机制,特别是在处理大型数据集时。它确保生产者不会压倒消费者,从而避免崩溃、数据丢失或性能问题。

想象一下消防水管——背压就像一个阀门,调节水流以防止消费者(在这种情况下是握住水管的人)被水流冲走。

关键点是:计算机的内存是有限的。如果没有背压,生产者(如消防水管)可以比消费者(处理数据的程序)更快地发送数据,从而存储数据在内存中。这会导致两个问题:

  1. 数据丢失:如果数据到达速度超过了消费者的处理速度,某些数据可能会被丢弃或完全丢失。

  2. 性能问题:消费者可能难以跟上数据的输入速度,导致处理缓慢、崩溃或高内存使用。

背压通过创建受控的流动来防止这种情况发生。消费者向生产者发出信号,要求其在达到内存容量时减慢数据流。这确保了数据按可管理的块处理,避免内存过载和数据丢失。

Backpressure 如何工作

背压的主要目标是为消费者提供节流生产者的机制,确保数据流不会超过消费者的处理能力。这是通过受控的缓冲系统实现的,其中流会暂时保存数据,直到消费者准备好处理更多数据。

在 Node.js 流中,这种缓冲和流量控制通过一个名为“highWaterMark”的内部属性进行管理。highWaterMark 指定了在暂停额外的数据读取或写入之前,内部缓冲区中可以缓冲的最大字节数(对于字节流)或对象数(对于对象流)。

缓冲和流量控制

以下是缓冲和流量控制如何管理背压的简要概述:

这种背压机制确保了流通过管道的数据受到调节,防止消费者被压垮,并确保平滑、高效和可靠的数据处理。

Demo 先决要求

确保你满足以下要求以便跟随下文代码:

nvm install 20
nvm use 20
mkdir workshop
cd workshop
git clone https://github.com/mcollina/streams-training.git

使用可读流 Readable Streams

Readable 是我们用于顺序读取数据源的类。

关键方法和事件

可读流通过几个核心方法和事件操作,允许对数据处理进行细致控制:

基础可读流

以下是一个简单的可读流实现示例,它动态生成数据:

const { Readable } = require('stream');

class MyStream extends Readable {
  #count = 0;
  _read(size) {
    this.push(':-)');
    if (this.#count++ === 5) { this.push(null); }
  }
}

const stream = new MyStream();

stream.on('data', chunk => {
  console.log(chunk.toString());  
});

在此代码中,MyStream 类继承 Readable 并覆盖其 _read 方法。它将字符串 ":-)" 推入内部 buffer 中。在推入5次后,它通过推入 null 来表示流结束。on('data') 事件处理器将其接收到的每一个 chunk 都记录到控制台中。

输出

:-)
:-)
:-)
:-)
:-)
:-)

用 Pause 和 Resume 管理背压

可以通过应用程序处理数据的能力来暂停和恢复流来处理背压。以下是如何实现这一点:

const stream = new MyStream();

stream.on('data', chunk => {
  console.log(chunk.toString());
  stream.pause();  // 暂停接收数据以模拟处理延迟
  setTimeout(() => {
    stream.resume();  // 1000毫秒后恢复
  }, 1000);
});

在这个设置中,每次接收到数据时,流都会暂停,并设置一个超时以在一秒钟后恢复流。这模拟了处理每个数据块需要时间的场景,从而通过使用 pause()resume() 控制数据流来演示基本的背压管理。

使用 “readable” 事件的高级控制

为了对数据流进行更精细的控制,可以使用 readable 事件。这个事件更复杂,但通过允许显式控制何时从流中读取数据,为某些应用程序提供了更好的性能:

const stream = new MyStream({
  highWaterMark: 1
});

stream.on("readable", () => {
  console.count(">> readable event");
  let chunk;
  while ((chunk = stream.read()) !== null) {
    console.log(chunk.toString());  // 处理数据块
  }
});
stream.on("end", () => console.log('>> end event'));

在这里,使用 readable 事件根据需要手动从流中提取数据。readable 事件处理程序内部的循环继续从流缓冲区读取数据,直到它返回 “null”,表示缓冲区暂时为空或流已结束。将 highWaterMark 设置为 1 保持缓冲区大小较小,触发 readable 事件更频繁,并允许对数据流进行更细粒度的控制。

>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
:-)
>> readable event: 7
>> end event

异步迭代器

异步迭代器是我们推荐的标准方式来建模流数据。与 Web 和 Node.js 上的所有流原语相比,异步迭代器更容易理解和使用。在最近的 Node.js 版本中,异步迭代器作为一种更优雅和可读的方式来与流交互。基于事件的基础,异步迭代器提供了更高层次的抽象,简化了流的消费。

在 Node.js 中,所有可读流都是异步可迭代的。这意味着你可以使用 for await...of 语法在流数据变得可用时循环处理每一块数据,以异步代码的效率和简洁性来处理每一块数据。

以下是一个使用异步迭代器与可读流的示例:

import { setTimeout as sleep } from 'timers/promises';

async function* generate() {
  yield 'hello';
  await sleep(10);  // 模拟延迟
  yield ' ';
  await sleep(10);  // 模拟另一个延迟
  yield 'world';
}

Readable.from(generate()).on('data', chunk => console.log(chunk));

在这个示例中,generate 函数是一个异步生成器,它在之间有意地暂停,生成三块数据,使用 await sleep(10)。这模拟了异步操作的行为,如 API 调用或数据库查询,在数据可用之间有显著的延迟。

Readable.from() 方法用于将这个生成器转换为一个可读流。这个流会在每块数据变得可用时发出,并且我们使用 on('data') 事件来处理它并记录到控制台。运行此代码的输出将是:

hello
world

使用异步迭代器与流的优势

使用异步迭代器与流简化了异步数据流的处理,具有以下几个方面的优势:

异步迭代器提供了一种更现代且通常更可读的方式来处理可读流,尤其是在处理异步数据源或你更喜欢基于循环的顺序数据处理方法时。

使用可写流 Writable Streams

可写流对于创建文件、上传数据或涉及顺序输出数据的任何任务都很有用。当可读流提供数据源时,Node.js 中的可写流作为数据的目标。

可写流的关键方法和事件

创建一个可写流

以下是创建一个可写流的示例,该流将所有传入数据转换为大写后再写入标准输出:

import { Writable } from 'stream';
import { once } from 'events';

class MyStream extends Writable {
  constructor() {
    super({ highWaterMark: 10 /* 10 字节 */ });
  }
  _write(data, encode, cb) {
    process.stdout.write(data.toString().toUpperCase() + '\n', cb);
  }
}
const stream = new MyStream();

for (let i = 0; i < 10; i++) {
  const waitDrain = !stream.write('hello');

  if (waitDrain) {
    console.log('>> wait drain');
    await once(stream, 'drain');
  }
}

stream.end('world');

在这段代码中,MyStream 是一个自定义的可写流,具有 10 字节的缓冲容量(highWaterMark)。它重写了 _write 方法,在写出数据之前将其转换为大写。

这个循环尝试向流写入十次 hello。如果缓冲区填满(waitDrain 变为 true),它将在继续之前等待 drain 事件,确保我们不会使流的缓冲区超载。

输出将是:

HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD

如何避免通过 HTTP 流式传输文件

在处理流时,特别是在 HTTP 上下文中,必须注意潜在的问题。让我们看一个如何不要通过 HTTP 流式传输文件的示例:

const { createReadStream } = require('fs');
const { createServer } = require('http');


const server = createServer((req, res) => {
  createReadStream(__filename).pipe(res);
});
server.listen(3000);

虽然看起来很简单,但这段代码有一个隐藏的危险。如果在文件读取过程中发生错误(例如,文件不存在),错误可能不会被正确处理,从而可能导致服务器崩溃。

管道流中的错误处理

处理流中的错误,特别是在管道时,可能很复杂,因为流管道的任何部分中的错误都可能影响整个过程。以下是一个示例:

const fs = require('fs');
const { Transform } = require('stream');

const upper = new Transform({
  transform: function (data, enc, cb) {
    this.push(data.toString().toUpperCase());
    cb();
  }
});

fs.createReadStream(__filename)
  .pipe(upper)
  .pipe(process.stdout);

在这种情况下,如果在任何阶段(读取、转换或写入)发生错误,可能不会被捕获,导致意外行为。使用 stream/promises 模块中的 pipeline() 函数等技术可以更有效地管理这些错误。

将异步迭代器与管道结合使用

将管道与异步迭代器结合使用提供了一种强大的方式来处理流转换,具有最小的缓冲:

import fs from 'fs';
import { pipeline } from 'stream/promises';

await pipeline(
  fs.createReadStream(import.meta.filename),
  async function* (source) {
    for await (let chunk of source) {
      yield chunk.toString().toUpperCase();
    }
  },
  process.stdout
);

此代码实现了与前一个示例相同的结果,但具有更好的错误处理。pipeline 函数自动管理管道中的错误,简化了我们的代码,并使其免受内存过载问题的影响。

通过结合流的效率、背压机制的控制和异步迭代器的优雅性,你可以构建能够优雅高效地处理大量数据的高性能应用程序。

总结

流提供了一种强大而高效的方式来管理数据,特别是对于大型数据集、实时场景和内存优化。

本文探索了 Node.js 流的基本概念,了解了可读流和可写流、背压、错误处理以及异步迭代器等高级技术。

原文链接:https://blog.platformatic.dev/a-guide-to-reading-and-writing-nodejs-streams