stream 释义

  • stream 是水流,但默认没有水
  • stream.write 可以让水流中有水(数据)
  • 每次写的小数据叫做 chunk (块)
  • 产生数据的一段叫做 source (源头)
  • 得到数据的一段叫做 sink (水池)

实用栗子

用 stream 和不用 stream 传输大文件时,node 占用内存区别很大
用 stream 时,读一个 150m 的文件,基本不会高于 30m
但是不用 stream,内存占用 100m+,要知道分配给nodeJs的内存有限,所以用 stream 就可以一点一点慢慢传

管道 释义

  • 两个流可以用一个管道相连
  • stream1 的末尾连接上 stream2 的开端
  • 只要 stream1 有数据,就会流到 stream2

pipe 也可以通过事件实现

1
2
3
4
5
6
7
8
// stream1 有数据就塞给 stream2
stream1.on('data', chunk => {
stream2.write(chunk)
})
// stream1 停了,就停掉 stream2
stream1.on('end', () => {
stream2.end()
})

Stream 对象的原型链

栗子

  • 自身属性(由fs.Readable.prototype 构造)
  • 原型:stream.Readable.peototype
  • 二级原型: stream.Stream.prototype
  • 三级原型:events.EventEmitter.prototype
  • 四级原型:Object.prototype
  • Stream 对象都继承了 EventEmitter

支持的事件和方法

Stream 分类

  • Readable 可读
  • Writeable 可写
  • Duplex 可读可写(双向)(读写一般不交叉)
  • Transform 可读可写(变化)(自己写,自己读,webpack常用,比如把 scss 转换成 css,es6 编译成 es5)

手动实现一个 Readable 的流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const { Readable } = require('stream')

// 先推再读
// const inStream = new Readable()

// inStream.push('zch')
// inStream.push('233333333333333333')
// inStream.push(null) // no more data

// // inStream.pipe(process.stdout)
// // 等价于
// inStream.on('data', chunk => {
// console.log('push')
// console.log(chunk.toString())
// })


// 等读了再推
const inStream = new Readable({
read(size) {
const char = String.fromCharCode(this.charCode++)
console.log('push\n')
this.push(char)
if (this.charCode > 90) {
this.push(null)
}
}
})

inStream.charCode = 65

inStream.pipe(process.stdout)

手动实现一个 writable 的流

1
2
3
4
5
6
7
8
9
10
const { Writable } = require('stream')

const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
})

process.stdin.pipe(outStream)

手动实现一个 duplex 的流

1
// 只需要把 writable 和 readable 结合起来,同事实现 read 和 write 方法即可

手动实现一个 transform 的流

1
2
3
4
5
6
7
8
9
const { Transform } = require('stream')
const upperTransform = new Transform({
transform (chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase())
callback()
}
})

process.stdin.pipe(upperTransform).pipe(process.stdout)
transform 的流的栗子(实现gzip):
1
2
3
4
5
6
7
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))
升级(加点进度条)
1
2
3
4
5
6
7
8
9
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => console.log('.'))
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('Done'))
再次升级(升级为一个 transform 方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

const { Transform } = require('stream')
const transform = new Transform({
transform (chunk, encoding, callback) {
console.log('.')
callback(null, chunk)
}
})


fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(transform) // 这里可以做进度条,webpack 就是这样的原理,vue-loader => scss-loader => css-loader => style-loader
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('Done'))
再次升级(我们还可以加密内容)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
const crypto = require('crypto')

const { Transform } = require('stream')
const transform = new Transform({
transform (chunk, encoding, callback) {
console.log('.')
callback(null, chunk)
}
})


fs.createReadStream(file)
.pipe(crypto.createCipher('aes192', '123456'))
.pipe(zlib.createGzip())
.pipe(transform) // 这里可以做进度条,webpack 就是这样的原理,vue-loader => scss-loader => css-loader => style-loader
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('Done'))

Stream 用途广泛

参考
文档
面试题