Node.js-Stream学习记录

什么是 Stream(流)

先看一下 Node 官网中对于 Stream 的介绍

A stream is an abstract interface for working with streaming data in Node.js.

其实我们写的 Node.js 代码经常会用到流。

来看一段代码

// src/test.mjs
import http from 'node:http';
import fs from 'node:fs';
import path from 'node:path';

const server = http.createServer(async function (req, res) {
  const data = fs.readFileSync(
    path.join(import.meta.dirname, '..', 'package.json'),
    'utf-8'
  );
  res.end(data);
});

server.listen(8000);

我们跑了个 http 服务。

用 fs.readFileSync 读取项目中 package.json 的内容返回。

启动服务后使用 curl 访问下

curl -i http://localhost:8000

image

因为是全部读完返回的,所以可以知道 Content-Length,也就是响应体的长度。

当文件比较小的时候,这样读取、返回没啥问题。

那如果文件非常大呢?

比如有好几百 M,这时候全部读取完再返回是不是就合适了?

因为要等好久才能读取完文件,之后才有响应。

这就需要用到流了:

//更新 src/test.mjs
import http from 'node:http';
import fs from 'node:fs';
import path from 'node:path';

const server = http.createServer(async function (req, res) {
  const readStream = fs.createReadStream(
    path.join(import.meta.dirname, '..', 'package.json'),
    'utf-8'
  );
  readStream.pipe(res);
});

server.listen(8000);

image

结果一样,但是因为现在是流式返回的,并不知道响应体的 Content-Length。

所以是用 Transfer-Encoding: chunked 的方式返回流式内容。

从服务器下载一个文件的时候,如何知道文件下载完了呢?

有两种方式:

一种是 header 里带上 Content-Length,浏览器下载到这个长度就结束。

image

另一种是设置 transfer-encoding:chunked,它是不固定长度的,服务器不断返回内容,直到返回一个空的内容代表结束。

比如这样:

5
Hello
1
,
5
World
1
!
0

这里分了 “Hello” “,” “World”“!” 这 4 个块,长度分别为 5、1、5、1

最后以一个长度为 0 的块代表传输结束。

这样,不管内容多少都可以分块返回,就不用指定 Content-Length 了。

这就是大文件的流式传输的原理,就是 transfer-encoding:chunked。

当然,这是 http 传输时的流,在用 shell 命令的时候,也经常会用到流:

比如

ls | grep pack

image

ls 命令的输出流,作为 grep 命令的输入流。

当然,我们也可以把 grep 命令的输出流,作为 node 脚本的输入流。

//src/read.mjs
process.stdin.on('readable', function () {
    const buf = process.stdin.read();
    console.log(buf?.toString('utf-8'));
});

process.stdin 就是输入流,监听 readable 事件,用 read 读取数据。

执行一下

ls | grep pack | node src/read.mjs

image

可以看到,我们的 node 脚本接收到了 grep 的输出流作为输入流。

这就是管道 pipe 的含义。

综上,可以小结下我们对流的认识:

流就是分段的传输内容,比如从服务端像浏览器返回响应数据的流,读取文件的流等。

流和流之间可以通过管道 pipe 连接,上个流的输出作为下个流的输入。

流的类型

在 node 里,流一共有 4 种:可读流 Readable、可写流 Writable、双工流 Duplex、转换流 Transform。

import stream from 'node:stream';

// 可读流
const Readable = stream.Readable;
// 可写流
const Writable = stream.Writable;
// 双工流
const Duplex = stream.Duplex;
// 转换流
const Transform = stream.Transform;

其余的流都是基于这 4 种流封装出来的。

Readable

Readable 要实现 _read 方法,通过 push 返回具体的数据。

//readable.mjs
import { Readable } from 'node:stream';

const readableStream = new Readable();

readableStream._read = function() {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});

当 push 一个 null 时,就代表结束流。

执行一下

node src/readable.mjs

image

创建 Readable 流也可以通过继承的方式:

// src/readable2.mjs
import { Readable } from 'node:stream';

class ReadableDong extends Readable {

    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }

}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});

image

可读流是生成内容的,那么很自然可以和生成器结合:

// src/readable3.mjs

import { Readable } from 'node:stream';

class ReadableDong extends Readable {

    constructor(iterator) {
        super();
        this.iterator = iterator;
    }

    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }

}

function *songGenerator() {
    yield '阿门阿前一棵葡萄树,';
    yield '阿东阿东绿的刚发芽,';
    yield '阿东背着那重重的的壳呀,';
    yield '一步一步地往上爬。';
}

const songIterator = songGenerator();

const readableStream = new ReadableDong(songIterator);

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done');
});
  • 和 yield 是 js 的 generator 的语法,它是异步返回 yield 后的内容,通过 iterator 的 next 来取下一个。

image

我们封装个工厂方法:

function createReadStream(iterator) {
  return new ReadableDong(iterator);
}

const readableStream = createReadStream(songIterator);

readableStream.on('data', (data) => {
  console.log(data.toString());
});

readableStream.on('end', () => {
  console.log('done');
});

是不是就和 fs.createReadStream 很像了?

// 创建 src/fsReadStream.mjs
import fs from 'node:fs';
import path from 'node:path';

const readStream = fs.createReadStream(
  path.join(import.meta.dirname, '..', 'package.json'),
  'utf-8'
);

readStream.on('data', (data) => {
  console.log(data.toString());
});

readStream.on('end', () => {
  console.log('done');
});

image

其实文件的 ReadStream 就是基于 stream 的 Readable 封装出来的。

这就是可读流。

http 服务的 request 就是 Readable 的实例:

image

所以我们可以这样写:

// src/test2.mjs
import http from 'node:http';
import fs from 'node:fs';

const server = http.createServer(async function (req, res) {
    const writeStream = fs.createWriteStream('aaa.txt', 'utf-8');
    req.pipe(writeStream);
    res.end('done');
});

server.listen(8000);

启动服务后访问一下

curl -X POST -d "a=1&b=2" http://localhost:8000

image

可以看到,从 request 的流中读出的内容写入了文件的 WriteStream

Writable