浅析node中流应用(一) 可读流(fs.createReadStream)

您所在的位置:网站首页 fs文件是什么 浅析node中流应用(一) 可读流(fs.createReadStream)

浅析node中流应用(一) 可读流(fs.createReadStream)

2023-11-27 22:44| 来源: 网络整理| 查看: 265

为什么要需要流? 当我们学习新知识的时候,首先我们知道为什么要学习,那我们为什么要学习流?因为在在node中读取文件的方式有来两种,一个是利用fs模块,一个是利用流来读取。如果读取小文件,我们可以使用fs读取,fs读取文件的时候,是将文件一次性读取到本地内存。而如果读取一个大文件,一次性读取会占用大量内存,效率很低,这个时候需要用流来读取。流是将数据分割段,一段一段的读取,可以控制速率,效率很高,不会占用太大的内存。gulp的task任务,文件压缩,和http中的请求和响应等功能的实现都是基于流来实现的。因此,系统学习下流还是很有必要的 可读流用法(先把用法学会) node中读是将内容读取到内存中,而内存就是Buffer对象 流都是基于原生的fs操作文件的方法来实现的,通过fs创建流。所有的 Stream 对象都是 EventEmitter 的实例。常用的事件有: open -打开文件 data -当有数据可读时触发。 error -在读收和写入过程中发生错误时触发。 close -关闭文件 end - 没有更多的数据可读时触发 创建可读流 统一下 1.txt中的内容 1234567890 let fs = require('fs'); let rs = fs.createReadStream('./1.txt',{ highWaterMark:3, //文件一次读多少字节,默认 64*1024 flags:'r', //默认 'r' autoClose:true, //默认读取完毕后自动关闭 start:0, //读取文件开始位置 end:3, //流是闭合区间 包含start也含end encoding:'utf8' //默认null }); 注意: 默认创建一个流 是非流动模式,默认不会读取数据 具体参数说明,我们可以参考下node官网详细介绍 http://nodejs.cn/api/fs.html#fs_fs_createreadstream_path_options 监听open事件 rs.on("open",()=>{ console.log("文件打开") }); 监听data事件

可读流这种模式它默认情况下是非流动模式(暂停模式),它什么也不做,就在这等着

监听了data事件的话,就可以将非流动模式转换为流动模式

流动模式会疯狂的触发data事件,直到读取完毕

直接上代码

//1.txt中内容为1234567890 let fs = require('fs'); let rs = fs.createReadStream('./1.txt',{ highWaterMark:3, //文件一次读多少字节,默认 64*1024 flags:'r', //默认 'r' autoClose:true, //默认读取完毕后自动关闭 start:0, //读取文件开始位置 end:3, //流是闭合区间 包含start也含end encoding:'utf8' //默认null }); rs.on("open",()=>{ console.log("文件打开") }); //疯狂触发data事件 直到读取完毕 rs.on('data',(data)=>{ console.log(data); //共读4个字节,但是highWaterMark为3,所以触发2次data事件,分别打印123 4 });

监听err/end/close事件 rs.on("err",()=>{ console.log("发生错误") }); rs.on('end',()=>{ //文件读取完毕后触发 console.log("读取完毕"); }); rs.on("close",()=>{ //最后文件关闭触发 console.log("关闭") });

不要急,最后把方法介绍完统一写个例子,大家一看便一目了之

最后介绍两个方法就大功告成啦 rs.pause() 暂停读取,会暂停data事件的触发,将流动模式转变非流动模式 rs.resume()恢复data事件,继续读取,变为流动模式

终于把可读流的所有API讲完了,迫不及待的写个完整的案例来体验下,说干就干

用法讲完了,可以开始写写源码实现了,一共不到100行,来慢慢感受下,还记得刚开始的时候说的所有的 Stream 对象都是 EventEmitter 的实例。如果对发布订阅模式中EventEmitter还不了解,可以读下上篇文章发布订阅模式还不会??戳这里,50行核心代码,手把手教你学会 手写可读流 一、准备工作,构建可读流构造函数 记住Stream 对象都是 EventEmitter 的实例,内部是通过发布订阅模式实现的。直接贴代码 let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { //创建可读流类,继承 EventEmitter constructor(path, options = {}) { //options默认空对象 super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.pos = this.start; //pos会随着读取的位置改变 this.end = options.end || null; this.encoding = options.encoding || null; this.flags = options.flags || 'r'; this.flowing = null; //非流动模式 //声明一个buffer表示都出来的数据 this.buffer = Buffer.alloc(this.highWaterMark); this.open(); //打开文件 fd } 其实只是赋值了很多默认值,没有什么难点,接下来就要写this.open()方法,即打开文件 二、在ReadStream原型中写open方法 废话不多说,直接上代码,代码中有详细的代码解释 //打开文件用 open() { fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型) if (err) { if (this.autoClose) { //如果需要自动关闭我再去销毁fd this.destroy(); //关闭文件(触发关闭事件) } this.emit('error', err); //打开文件发生错误,发布error事件 } this.fd = fd; //保存文件描述符 this.emit('open', this.fd) //触发文件open方法 }) } 想下,打开文件我们做了两件事, 1、如果发生错误,关闭文件,同时发射 "error"事件 2、如果没有错误,保存fd,然后发射 "open"事件 先来实现下this.destroy()关闭文件的方法 三、实现destroy()方法 destroy() { if (typeof this.fd != 'number') { //文件未打开,也要关闭文件且触发close事件 return this.emit('close'); } fs.close(this.fd, () => { //如果文件打开过了 那就关闭文件并且触发close事件 this.emit("close"); }) } 这样一来,rs.on('open')已经实现了,我们来测试下吧

四、实现主要的read方法真的读文件,于rs.on('data')方法对应 1、确保真的拿到fd(文件描述符,默认3,number类型) 2、确保拿到fd后,对fs.read中howMuchToRead有一个绕的算法,多举几个例子理解更好,如果对fs.read不了解,戳这里,fs.read()方法介绍 3、异步递归去读文件,读完为止。 4、说了这么多,直接干。 read() { //此时文件还没打开 if (typeof this.fd != 'number') { //当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd 就有了 return this.once('open', () => this.read()) } //此时有fd了 开始读取文件了 //this.pos是变量,开始时this.pos = this.start,在上面定义过了 //算法有点绕,源码中是这样实现的。举个例子 end=3,pos=0,highWaterMark=3, howMuchToRead = 3, 1.txt内容1234 就会读123 4 let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, byteRead) => { // byteRead真实读到的个数 this.pos += byteRead; // this.buffer默认三个 let b = this.buffer.slice(0, byteRead); //对读到的b进行编码 b = this.encoding ? b.toString(this.encoding) : b; //把读取到的buffer发射出去 this.emit('data', b); if ((byteRead === this.highWaterMark) && this.flowing) { return this.read(); } //这里没有更多逻辑了 if (byteRead < this.highWaterMark) { //没有更多了 this.emit('end'); //读取完毕 this.destroy(); //销毁完毕 } }) }

大家会发现,此时我们还没有监听 rs.on('data')事件,来触发read方法,此时我们需要修改下 第一步创建构造函数的代码

constructor(path, options = {}) { //省略.... 代码和第一步一样,下面是新添加 // 看是否监听了data事件,如果监听了就要变成流动模式 this.on('newListener', (eventName, callback) => { if (eventName === 'data') { //相当于用户监听了data事件 this.flowing = true; // 监听了就去读 this.read(); //去读内容 } }) }

如果能看到这里,就基本大功告成,就只剩下pause和resume 暂停和恢复暂停方法。那就一写到底

五、添加pause暂停 和resume恢复暂停方法 两个方法非常简单,就直接贴代码 pause() { this.flowing = false; } resume() { this.flowing = true; //恢复暂停,在去无限读 this.read(); }

终于大功告成,写的对不对呢,赶紧测试下吧,期待的搓手手

end 我们已经实现了可读流实现,后续还会有可写流实现。api虽然枯燥,希望大家还是多写写源码 对源码感兴趣,我把源码放在github上 ,供大家参考


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3