参考
- 《Node 设计模式2》
1. 什么是流?
不同于缓冲,流可以读取数据的一部分同时立即提供给后序处理,这提高了内存的空间效率和程序运行的时间效率。
缓冲的内存空间问题:缓冲是一次性读取所有数据到内存,然后才能交给处理程序,如果文件数据过大比如由几百MB甚至几百GB,那么读取整个文件到缓冲,然后再一次下返回,那么可能会导致内存溢出(比如说在 V8 的缓存区不能超过1GB)。缓冲的运行时间问题:只有在文件数据被完整读取到内存才能进行后续处理,比如说 如果使用同步读取的方法读取一个文件,当文件过大就会阻塞程序运行,虽然使用异步方法读取文件不会阻塞,但是仍然会把文件整个读到内存中。复制代码
使用流可以缓解缓冲的以上问题:流可以实现一些无法通过缓存数据并一次性处理来实现的功能(比如在处理大文件压缩时,压缩之前要先读取文件,而文件太大,内存会溢出,如果使用流,那么可以通过流的管道在读文件的同时对流中的数据块进行压缩),除了避免内存溢出的空间问题,流还允许一旦读到文件的数据块就可以尽快处理数据,这比缓冲一次性读取整个文件再进行处理执行的速度会更快。
2. 流的分类
3. 流的特点
-
组合性 :
流还可以通过管道
pipe()
组合起来,管道中的每个独立单元只实现单一的功能,(像中间件?),管道中的上一个处理单元的输出流作为下一个单元的输入,前提是管道中的下一个流必须支持上一个流的输出数据类型。流的可组合性不仅使流可以用来处理前面的纯 I/O 问题,也可以用来对代码进行 简化 和 模块化处理。使用pipe()
还可以避免手动解决背压问题,因为管道会自动处理。 -
流的操作模式 :
流的操作模式有 对象模式 和 二进制模式 。 当创建流时,可以使用 objectMode 选项把流实例切换到对象模式。在该模式下,流中的数据可以看作是一系列独立的对象,通过设置 objectMode 参数使得流可以接受任何 JavaScript 对象。 在二进制模式下,流中的数据是以块的形式存在的,比如缓冲或者字符串.
-
流的背压 :
在可读流中,数据写入缓冲的速度大于被读出的速度,那么缓冲会积聚大量数据,占用大量的内存。如果使用管道pipe()方法,那么不用自己解决背压问题。
4. 使用流:
首先要创建流,可以使用 Node 内置的的流接口,也可以通过继承创建新的流类,或者直接使用第三方模块。
5. 流的使用场景:
(1)处理I/O:
比如使用流对读入数据立即进行压缩和发送,不用等待所有数据读入缓存之后才能压缩和发送。 利用流的可组合性使用管道对数据进行各种变化,不仅是压缩还可以加密。复制代码
(2)使用流进行流程控制:
如顺序执行、无序并行执行、无序有限制的并行执行、顺序并行执行。复制代码
a. 顺序执行:
比如将几个文件的内容拼接起来,要遵循文件的先后顺序,首先定义一个目标文件流,用 from2-array 这个模块创建一个文件数组,然后使用管道pipe(),在pipe()中把每一个文件以流的模式读入,并且用读入目标文件流的管道中,将每一个文件按先后顺序依次读入目标文件流,就把所有文件拼接起来了。代码:
function concatFiles(destination,files,callback){ const destStream = fs.createWriteStream(destination); fromArray.obj(files) .pipe(through.obj( ( file, enc, done ) => { const src = fs.createReadStream(file); src.pipe(destStream, { end:false}); src.on('end', done) }));} module.exports = concatFiles复制代码
在 concat.js 使用此模块:
// concat.js:const concatFiles = require('./concatFiles');concatFiles(process.argv[2], process.argv.slice(3), ()=>{ console.log('文件成功连接!!')})复制代码
运行 node concat allTogether.txt file1.txt file2.txt
将 file1.txt 、file2.txt 两个文件的内容拼接然后输入到 allTogether.txt 里。
b. 无序并行执行:
充分利用 Node.js 的并发性,在两个数据块之间没有联系时,用并列执行来提高处理速度。比如有一个文件 urlList.txt, 里面的文本是每行一个URL,现在用流的方式并行读入这些URL 来做一些处理。
定义一个并行执行的变换流类 :
const stream = require('stream');class ParallelStream extends stream.Transform { constructor(userTransform){ super({ objectMode:true}); this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; } _transform(chunk, enc, done){ this.running++; this.userTransform( chunk, enc, this.push.bind(this), this._onComplete.bind(this)); done() } _flush(done){ if(this.running > 0){ this.terminateCallback = done; }else{ done(); } } _onComplete(err){ this.running--; if(err){ return this.emit('error',err); } if(this.running === 0){ this.terminateCallback && this.terminateCallback() } }}复制代码
在上面创建的变换类中,默认使用对象模式,_transform() 方法会执行传入的变换函数,而且为了实现并行,在变换函数为执行完之前调用 done() 方法,变换函数中会绑定一个回调函数 _onComplete() ;流终止时会调用 _flush() 方法,只要还有任务在运行中,就不调用done()(done 的作用是每一个流的块完成之后才能处理下一个流,是一个流的原生API),而是将 done 赋值给 termnateCallback(), 赋值这个操作延迟了 done()执行, 也就可以延迟 finish 事件触发;_onComplete() 方法在每一个异步任务完成后执行, 当所有任务完成后就调用 termnateCallback() 也就是 done() , 并且触发 finish 事件,执行 _flush() .
使用这个类:checkUrls.js:
const fs = require('fs');const split = require('split');const request = require('request');const ParallelStream = require('./parallelStream');fs.createReadStream(process.argv[2]) .pipe(split()) .pipe(new ParallelStream(( url, enc, push, done) => { if(!url) return done(); request.head(url ,(err, respond)=>{ push(url + 'is' + (err ? 'down' : 'up') + '\n'); done() }) })) .pipe(fs.createWriteStream('result.txt')) .on('finish', ()=> console.log(' 所有 URL 已经校验'))复制代码
运行 node checkUrls urlList.txt
urlList.text 的内容:
http://1XXXX http://2XXXX http://3XXXX复制代码
执行上面的命令后 得到一个 result.txt:
http://3XXXX is down http://1XXXX is up http://2XXXX id up复制代码
可以发现结果是无序的。
c. 无序有限制的并行执行 :
同时运行过多的任务会破坏应用的可靠性,控制负载和资源的有效方法就是限制任务的并发执行,减少一次并发的数量。只要在上面无序并行执行的流类中增加一个控制并行执行的数量变量 concurrency 和一个 continueCallback(),并且修改 _transform() 和 _onComplete() 为一下即可:
_transform(chunk,enc,done){ this.running++; this.userTransform(chunk,enc,this._onComplete.bind(this)); if(this.running < this.concurrency){ done(); }else{ this.continueCallback = done; }}_onComplete(err){ this.running--; if(err){ return this.emit('error',err); } const tmpCallback = this.continueCallback; this.continueCallback = null; tmpCallback && tmpCallback(); if(this.running === 0){ this.terminateCallback && this.terminateCallback(); }}复制代码
在上面的 _transform()
方法里,增加 if 判断在执行一个任务时是否还有空闲的资源可以用于下一个任务的执行, 如果当前工作流中任务总数达到了最大限度,就只是简单的把 done()
赋值给 continueCallback , 在 _onComplete()
里在一个任务完成后就可以执行这个 continueCallback 也就是 done()
,这样就延迟了 done()
的执行,也就是延迟了下一批任务的执行,等 _onComplete()
里执行了 continueCallback 表示上一批任务执行完成解除了流的阻塞,才可以执行下一批任务。这样就实现了有限制的并行执行。
d. 顺序并行执行 :
可以使用第三方模块 变化流
const throughParallel = require('through2-parallel');fs.createReadStream(process.argv[2]) .pipe(split()) .pipe(throughParallel.obj({ concurrency:2},(url,enc,done)=>{ // .... }) ) .pipe(fs.createWriteStream('result.txt')) .on('finish',()=>console.lpog('所有 URL 已被校验'))复制代码
(3)流的管道模式:(Node.js 中流的拼接技术)
a. 组合流:
组合流可以对整个管道进行模块化和重用。 组合流的原理和特征:组合流 中第一个是写入流,最后一个是可读流,管道中每个流的错误事件并不会沿着管道自动传递,需要为每一个流添加错误监听器,而组合流应该是一个黑盒,我们不能访问管道里的任何一个流,所以组合流必须简化错误管理机制,只需要对整个组合流添加错误机制而不是对管道中的每一个流。 使用第三方组合流模块比自己实现要简单多了:比如 multipipe、combine-stream
combinedStreams.js:
const zlib = require('zlib');const crypto = require('crypto');const combine = require('multipipe');module.exports.compressAndEncrypt = password => { return combine( zlib.createGzip(), crypto.createCipher('aes192', password) );};module.exports.decryptAndDecoDecompress = password => { return combine( crypto.createDecipher('ase192',password), zlib.createGunzip() )}复制代码
在 archive.js 使用上面这个模块:
const combine = require('multipipe');const fs = require('fs');const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;combine( fs.createReadStream(process.argv[3]) .pipe(compressAndEncryptStream(process.argv[2])) .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))).on('error', err => { // this error may comes from any stream in the pipeline console.log(err)})复制代码
运行 `node archive mypassword /path/to/a/file.txt'
b. 复制流 :
当想要把相同的数据传输到不同的目标时,或者对同样的数据进行不同的变化或者根据不同的标准来分离的数据的时候需要复制流,将一个可读流传输到多个可写流中。
一个例子?:创建一个文件的可读流并将其复制为两个不同流,分别对俩个流进行 sha1 和 md5 计算。
const fs = require('fs');const crypto = require('crypto');const sha1Stream = crypto.createHash('sha1');sha1Stream.setEncoding('base64');const md5Stream = crypto.createHash('md5');md5Stream.setEncoding('bade64')const inputFile = process.argv[2];const inputStream = fs.createReadstream(inputFile);inputStream .pipe(sha1Stream) .pipe(fs.createWriteStream(inputFile + '.sha1'));inputStream .pipe(md5Stream) .pipe(fs.createWriteStream(inputFile + '.md5'));复制代码
⚠️ 复制流需要注意:复制的两个流会接受相同的数据,对数据的操作要小心以免影响到复制的每一个流。
c. 合并流 :
与复制流相反。将一组可读流合并传输到一个可写流中。需要注意的是,如果默认每一个流都是自动结束,那么就会导致只要有一个流结束了那么目标流也会结束,所以应该使用 { end: false }设置选项使每一个读入流不能自动结束。
一个例子?:将2个不同目录的内容打包为一个。使用2个 npm 包,tar(一个使用流进行打包的库)、fstream(一个用来对文件创建对象流的库) mergeTar.js:
const tar = require('tar')const fstream = require('fstream');const path = require('path')const destination = path.resolve(process.argv[2])const sourceA = path.resolve(process.argv[3])const sourceB = path.recolve(process.argv[4])const pack = tar.Pack()pack.pipe(fstream.Write(destination))let endCount = 0function onEnd(){if(++endCount === 2){ pack.end() }}const sourceStreamA = fstream.Reader({ type:'Directory', path: sourceA}).on('end',onEnd)const sourceStreamB = fstream.Reader({ type:'Directory', path: sourceB}).on('end',onEnd)sourceStreamA.pipe(pack,{ end:false})sourceStreamB.pipe(pack,{ end:false})复制代码
运行 node mergeTar dest.tar /path/to/sourceA /path/to/sourceB
合并流也可以使用一些 npm 包:merge-stream、multistream-merge
d. 复用、分解 :
将多个流合并到一起以便使用单一的流来传输数据的模式就是 复用; 相反,从共享流接收的数据重新构建原始流称为分解。
举一个?:实现一个小程序,复用一个TCP连接,复用此通道的数据流是子线程的标准输出和标准错误,功能是启动一个子线程将其标准输出和标准错误这两个流重定向发送搭配一个远程服务器,然后服务器将两个数据流分别保存到两个独立的文件里。我们使用分组交换技术,每一个组除了数据之外还有一个头部用于标记每一个流的数据,并在分解是将不同的组路由传输到正确的通道。
第一次发布文章,如有错误希望大佬们指出。由于文章是从自己的日常学习笔记中截取下来的,可能在有些内容衔接上有些突兀,如果觉得笔记还可以,更多内容可以前往