stream 释义
- stream 是水流,但默认没有水
- stream.write 可以让水流中有水(数据)
- 每次写的小数据叫做 chunk (块)
- 产生数据的一段叫做 source (源头)
- 得到数据的一段叫做 sink (水池)
用 stream 和不用 stream 传输大文件时,node 占用内存区别很大
用 stream 时,读一个 150m 的文件,基本不会高于 30m
但是不用 stream,内存占用 100m+,要知道分配给nodeJs的内存有限,所以用 stream 就可以一点一点慢慢传
管道 释义
- 两个流可以用一个管道相连
- stream1 的末尾连接上 stream2 的开端
- 只要 stream1 有数据,就会流到 stream2
pipe 也可以通过事件实现
1 2 3 4 5 6 7 8
| stream1.on('data', chunk => { stream2.write(chunk) })
stream1.on('end', () => { stream2.end() })
|
Stream 对象的原型链
栗子
- 自身属性(由fs.Readable.prototype 构造)
- 原型:stream.Readable.peototype
- 二级原型: stream.Stream.prototype
- 三级原型:events.EventEmitter.prototype
- 四级原型:Object.prototype
- Stream 对象都继承了 EventEmitter
支持的事件和方法
Stream 分类
- Readable 可读
- Writeable 可写
- Duplex 可读可写(双向)(读写一般不交叉)
- Transform 可读可写(变化)(自己写,自己读,webpack常用,比如把 scss 转换成 css,es6 编译成 es5)
手动实现一个 Readable 的流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| const { Readable } = require('stream')
// 先推再读 // const inStream = new Readable()
// inStream.push('zch') // inStream.push('233333333333333333') // inStream.push(null) // no more data
// // inStream.pipe(process.stdout) // // 等价于 // inStream.on('data', chunk => { // console.log('push') // console.log(chunk.toString()) // })
// 等读了再推 const inStream = new Readable({ read(size) { const char = String.fromCharCode(this.charCode++) console.log('push\n') this.push(char) if (this.charCode > 90) { this.push(null) } } })
inStream.charCode = 65
inStream.pipe(process.stdout)
|
手动实现一个 writable 的流
1 2 3 4 5 6 7 8 9 10
| const { Writable } = require('stream')
const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()) callback() } })
process.stdin.pipe(outStream)
|
手动实现一个 duplex 的流
1
| // 只需要把 writable 和 readable 结合起来,同事实现 read 和 write 方法即可
|
1 2 3 4 5 6 7 8 9
| const { Transform } = require('stream') const upperTransform = new Transform({ transform (chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()) callback() } })
process.stdin.pipe(upperTransform).pipe(process.stdout)
|
1 2 3 4 5 6 7
| const fs = require('fs') const zlib = require('zlib') const file = process.argv[2]
fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + '.gz'))
|
升级(加点进度条)
1 2 3 4 5 6 7 8 9
| const fs = require('fs') const zlib = require('zlib') const file = process.argv[2]
fs.createReadStream(file) .pipe(zlib.createGzip()) .on('data', () => console.log('.')) .pipe(fs.createWriteStream(file + '.gz')) .on('finish', () => console.log('Done'))
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| const fs = require('fs') const zlib = require('zlib') const file = process.argv[2]
const { Transform } = require('stream') const transform = new Transform({ transform (chunk, encoding, callback) { console.log('.') callback(null, chunk) } })
fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(transform) // 这里可以做进度条,webpack 就是这样的原理,vue-loader => scss-loader => css-loader => style-loader .pipe(fs.createWriteStream(file + '.gz')) .on('finish', () => console.log('Done'))
|
再次升级(我们还可以加密内容)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| const fs = require('fs') const zlib = require('zlib') const file = process.argv[2] const crypto = require('crypto')
const { Transform } = require('stream') const transform = new Transform({ transform (chunk, encoding, callback) { console.log('.') callback(null, chunk) } })
fs.createReadStream(file) .pipe(crypto.createCipher('aes192', '123456')) .pipe(zlib.createGzip()) .pipe(transform) // 这里可以做进度条,webpack 就是这样的原理,vue-loader => scss-loader => css-loader => style-loader .pipe(fs.createWriteStream(file + '.gz')) .on('finish', () => console.log('Done'))
|
Stream 用途广泛
参考
文档
面试题