Node 流(stream) (可读流、可写流、双工流、转换流)

您所在的位置:网站首页 pipe翻译 Node 流(stream) (可读流、可写流、双工流、转换流)

Node 流(stream) (可读流、可写流、双工流、转换流)

2023-10-18 20:28| 来源: 网络整理| 查看: 265

文章已同步至【个人博客】,欢迎访问【我的主页】😃 文章地址:blog.fanjunyang.zone/archives/no…

流是什么?

顾名思义,流就是数据流动的意思。

举个例子:

比如小区停水了,没想到你家用无塔供水器存了一点水,然而隔壁没有水了,想向你家借点水,直接用桶运水吧,费力气,还有可能浪费,于是可以用根管子,连接你们两家,这样就可以通过管子直接把水流到隔壁家

这就类似于request对象向服务器发请求要资源,在这request请求资源的传播方式通过流来实现

为什么使用流?

一般我们处理数据有两种模式:buffer模式、stream模式

buffer模式:取完数据一次性操作 stream模式:边取数据边操作

举个例子:

你想用手机看部电影,用buffer模式就是你把这个电影全部缓存下来,然后再看。用stream模式,就是你边缓存边看

所以从这里就可以看出stream模式无论是在空间和时间上都优于buffer模式:

空间上:内存只会占用当前需要处理的一块数据区域的大小, 而不是整个文件 时间上:因为不需要全部的数据就可以开始处理, 所以时间就相当于是节约了

还有一个好处就是可以链式调用

如果说写入的速度跟不上读取的速度,就有可能导致数据丢失 所以说正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续,(也就是你看电影的时候,缓存着看着,如果你网络不好,没缓存,就看不了,等再缓存点,再看) 所以为了让可读流和可写流速度一致,就要用到流中必不可少的属性pipe了,pipe翻译过来意思是管道,就如上面的例子中的管子一样

流的类型

node中有四种基本流类型:

Readable 可读流 Writable 可写流 Duplex 可读可写流(双工流) Transform 在读写过程中可以修改和变换数据的Duplex流(转换流)

流中的数据有两种模式:

二进制模式,都是 string 字符串 和 buffer。 对象模式,流内部处理的是一系统普通对象。 可读流(Readable)

可读流中的两种模式:

流动模式 ( flowing ) :数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序。 暂停模式 ( paused ),必须显式的调用 read() 读取数据。

可读流都开始于 暂停模式

暂停模式 切换到 流动模式: 1.添加 data 事件回调。 2.调用 resume()。 3.调用 pipe()。

流动模式 切换到 暂停模式 1.如果没有管道目标,调用 pause()。 2.如果有管道目标,移除所有管道目标,调用 unpipe() 移除多个管道目标。

createReadStream 方法有两个参数:

1.第一个参数是读取文件的路径 2.第二个参数为 options 选项,其中有八个参数:

flags:标识位,默认为 r; encoding:字符编码,默认为 null; fd:文件描述符,默认为 null; mode:权限位,默认为 0o666; autoClose:是否自动关闭文件,默认为 true; start:读取文件的起始位置; end:读取文件的(包含)结束位置; highWaterMark:最大读取文件的字节数,默认 64 * 1024。

创建可读流,并监听事件:

const fs = require('fs'); //创建一个文件可读流 let rs = fs.createReadStream('./1.txt', { //文件系统标志 flags: 'r', //数据编码,如果调置了该参数,则读取的数据会自动解析 //如果没调置,则读取的数据会是 Buffer //也可以通过 rs.setEncoding() 进行设置 encoding: 'utf8', //文件描述符,默认为null fd: null, //文件权限, mode: 0o666, //文件读取的开始位置 start: 0, //文件读取的结束位置(包括结束位置) end: Infinity, //读取缓冲区的大小,默认64K highWaterMark: 3 }); //文件被打开时触发 rs.on('open', function () { console.log('文件打开'); }); //监听data事件,会让当前流切换到流动模式 //当流中将数据传给消费者后触发 //由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。 rs.on('data', function (data) { console.log(data); }); //流中没有数据可供消费者时触发 rs.on('end', function () { console.log('数据读取完毕'); }); //读取数据出错时触发 rs.on('error', function () { console.log('读取错误'); }); //当文件被关闭时触发 rs.on('close', function () { console.log('文件关闭'); });

注:open 和 close 事件并不是所有流都会触发。

当们监听data事件后,系统会尽可能快的读取出数据。但有时候,我们需要暂停一下流的读取,操作其他事情。

这时候就需要用到 pause() 和 resume() 方法。

const fs = require('fs'); //创建一个文件可读流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); rs.on('data', function (data) { console.log(`读取了 ${data.length} 字节数据 : ${data.toString()}`); //使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。 rs.pause(); //等待3秒后,再恢复触发'data'事件,将流切换回流动模式。 setTimeout(function () { rs.resume(); }, 3000); });

可读流的 readable 事件,当流中有数据可供读取时就触发。 注意当监听 readable 事件后,会导致流停止流动,需调用 read() 方法读取数据。 注:on('data'),on('readable'),pipe() 不要混合使用,会导致不明确的行为。

const fs = require('fs'); let rs = fs.createReadStream('./1.txt', { highWaterMark: 1 }); //当流中有数据可供读取时就触发 rs.on('readable', function () { let data; //循环读取数据 //参数表示要读取的字节数 //如果可读的数据不足字节数,则返回缓冲区剩余数据 //如是没有指定字节数,则返回缓冲区中所有数据 while (data = rs.read()) { console.log(`读取到 ${data.length} 字节数据`); console.log(data.toString()); } }); 可写流(Writable)

createWriteStream 方法有两个参数:

1.第一个参数是读取文件的路径 2.第二个参数为 options 选项,其中有七个参数:

flags:标识位,默认为 w; encoding:字符编码,默认为 utf8; fd:文件描述符,默认为 null; mode:权限位,默认为 0o666; autoClose:是否自动关闭文件,默认为 true; start:写入文件的起始位置; highWaterMark:一个对比写入字节数的标识,默认 16 * 1024。

创建可写流,并监听事件:

const fs = require('fs'); //创建一个文件可写流 let ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); //往流中写入数据 //参数一表示要写入的数据 //参数二表示编码方式 //参数三表示写入成功的回调 //缓冲区满时返回false,未满时返回true。 //由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。 console.log(ws.write('1', 'utf8')); console.log(ws.write('2', 'utf8')); console.log(ws.write('3', 'utf8')); console.log(ws.write('4', 'utf8')); function writeData() { let cnt = 9; return function () { let flag = true; while (cnt && flag) { flag = ws.write(`${cnt}`); console.log('缓冲区中写入的字节数', ws.writableLength); cnt--; } }; } let wd = writeData(); wd(); //当缓冲区中的数据满的时候,应停止写入数据, //一旦缓冲区中的数据写入文件了,并清空了,则会触发 'drain' 事件,告诉生产者可以继续写数据了。 ws.on('drain', function () { console.log('可以继续写数据了'); console.log('缓冲区中写入的字节数', ws.writableLength); wd(); }); //当流或底层资源关闭时触发 ws.on('close', function () { console.log('文件被关闭'); }); //当写入数据出错时触发 ws.on('error', function () { console.log('写入数据错误'); });

写入流的 end() 方法 和 finish 事件监听

const fs = require('fs'); //创建一个文件可写流 let ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); //往流中写入数据 //参数一表示要写入的数据 //参数二表示编码方式 //参数三表示写入成功的回调 //缓冲区满时返回false,未满时返回true。 //由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。 console.log(ws.write('1', 'utf8')); console.log(ws.write('2', 'utf8')); console.log(ws.write('3', 'utf8')); console.log(ws.write('4', 'utf8')); //调用end()表明已经没有数据要被写入,在关闭流之前再写一块数据。 //如果传入了回调函数,则将作为 'finish' 事件的回调函数 ws.end('最后一点数据', 'utf8'); //调用 end() 且缓冲区数据都已传给底层系统时触发 ws.on('finish', function () { console.log('写入完成'); });

写入流的 cork() 和 uncork() 方法,主要是为了解决大量小块数据写入时,内部缓冲可能失效,导致的性能下降。

const fs = require('fs'); let ws = fs.createWriteStream('./1.txt', { highWaterMark: 1 }); //调用 cork() 后,会强制把所有写入的数据缓冲到内存中。 //不会因为写入的数据超过了 highWaterMark 的设置而写入到文件中。 ws.cork(); ws.write('1'); console.log(ws.writableLength); ws.write('2'); console.log(ws.writableLength); ws.write('3'); console.log(ws.writableLength); //将调用 cork() 后的缓冲数据都输出到目标,也就是写入文件中。 ws.uncork();

注意 cork() 的调用次数要与 uncork() 一致。

const fs = require('fs'); let ws = fs.createWriteStream('./1.txt', { highWaterMark: 1 }); //调用一次 cork() 就应该写一次 uncork(),两者要一一对应。 ws.cork(); ws.write('4'); ws.write('5'); ws.cork(); ws.write('6'); process.nextTick(function () { //注意这里只调用了一次 uncork() ws.uncork(); //只有调用同样次数的 uncork() 数据才会被输出。 ws.uncork(); }); 双工流(Duplex)

Duplex(双工)流实际上是继承了Readable和Writable的一类流,一个Duplex对象既可当成可读流来使用,也可以当做可写流来使用。 所以需要继承Duplex类: 1.继承 Duplex 类 2.实现 _read() 方法 3.实现 _write() 方法

实现了_read()方法后,可以监听data事件来消耗Duplex产生的数据 实现了_write()方法后,便可以作为下游去消耗数据

相信大家对 read()、write() 方法的实现不会陌生,因为和 Readable、Writable 完全一样。

const Duplex = require('stream').Duplex; const myDuplex = new Duplex({ read(size) { // ... }, write(chunk, encoding, callback) { // ... } });

在实例化 Duplex 类的时候可以传递几个参数:

readableObjectMode : 可读流是否设置为 ObjectMode,默认 false writableObjectMode : 可写流是否设置为 ObjectMode,默认 false allowHalfOpen : 默认 true, 设置成 false 的话,当写入端结束的时,流会自动的结束读取端,反之亦然。

例子:

const Duplex = require('stream').Duplex; const kSource = Symbol('source'); class MyDuplex extends Duplex { constructor(source, options) { super(options); this[kSource] = source; } _write(chunk, encoding, callback) { // The underlying source only deals with strings if (Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); } _read(size) { this[kSource].fetchSomeData(size, (data, encoding) => { this.push(Buffer.from(data, encoding)); }); } }

这是不能执行的伪代码,但是可以看出来 Duplex 的作用,即可以生产数据,又可以消费数据,所以才可以处于数据流动管道的中间环节。

转换流(Transform)

在Duplex流中,可读流中的数据和可写流中的数据是分开的。 而Transform流是一种特殊的Duplex流,它继承自Duplex流,其可写端的数据经变换后会自动添加到可读端

Tranform 类内部继承了 Duplex 并实现了 writable.write() 和 readable._read() 方法

所以当我们自定义 Transform 流时,只需要: 1.继承 Transform 类 2.实现 _transform() 方法 3.实现 _flush() 方法(可以不实现)

_transform(chunk, encoding, callback) 方法用来接收数据,并产生输出,参数我们已经很熟悉了,和 Writable 一样, chunk 默认是 Buffer,除非 decodeStrings 被设置为 false。

在 _transform() 方法内部可以调用 this.push(data) 生产数据,交给可写流,也可以不调用,意味着输入不会产生输出。

当数据处理完了必须调用 callback(err, data) ,第一个参数用于传递错误信息,第二个参数可以省略,如果被传入了,效果和 this.push(data) 一样

transform.prototype._transform = function (data, encoding, callback) { this.push(data); callback(); }; transform.prototype._transform = function (data, encoding, callback) { callback(null, data); };

有些时候,transform 操作可能需要在流的最后多写入可写流一些数据。例如, Zlib流会存储一些内部状态,以便优化压缩输出。在这种情况下,可以使用_flush()方法,它会在所有写入数据被消费、触发 end之前被调用。

Transform 事件

Transform 流有两个常用的事件: 1.来自 Writable 的 finish 2.来自 Readable 的 end

当调用 transform.end() 并且数据被 _transform() 处理完后会触发 finish,调用_flush后,所有的数据输出完毕,触发end事件。

pipe()方法

pipe() 方法类似下面的代码,在可读流与可写流之间连接一个管道。

const fs = require('fs'); //创建一个可读流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); //创建一个可写流 let ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 }); rs.on('data', function (data) { let flag = ws.write(data); console.log(`往可写流中写入 ${data.length} 字节数据`); //如果写入缓冲区已满,则暂停可读流的读取 if (!flag) { rs.pause(); console.log('暂停可读流'); } }); //监控可读流数据是否读完 rs.on('end', function () { console.log('数据已读完'); //如果可读流读完了,则调用 end() 表示可写流已写入完成 ws.end(); }); //如果可写流缓冲区已清空,可以再次写入,则重新打开可读流 ws.on('drain', function () { rs.resume(); console.log('重新开启可读流'); });

用 pipe() 方法完成上面的功能。

const fs = require('fs'); //创建一个可读流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); //创建一个可写流 let ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 }); let ws2 = fs.createWriteStream('./3.txt', { highWaterMark: 3 }); //绑定可写流到可读流,自动将可读流切换到流动模式,将可读流的所有数据推送到可写流。 rs.pipe(ws); //可以绑定多个可写流 rs.pipe(ws2);

也可以用 unpipe() 手动的解绑可写流。

const fs = require('fs'); //创建一个可读流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); //创建一个可写流 let ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 }); let ws2 = fs.createWriteStream('./3.txt', { highWaterMark: 3 }); rs.pipe(ws); rs.pipe(ws2); //解绑可写流,如果参数没写,则解绑所有管道 setTimeout(function () { rs.unpipe(ws2); }, 0); >_


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3