stream

您所在的位置:网站首页 stream! stream

stream

#stream| 来源: 网络整理| 查看: 265

第一个stream的例子: const fs = require('fs') const stream = fs.createWriteStream('./big_file.txt') for(let i=0;i{ fs.readFile('./big_file.txt',(error,data)=>{ if(error) throw error response.end(data) console.log('done') }) }) server.listen(8888) console.log('started-----8888')

此时我们创建一个http服务,当我们试图读取刚刚创建的文件时,发现node.js的服务内存瞬间增加了100MB,此时如果存在多个请求,将会对机器造成很大的压力.

2. 通过createReadStream读取文件; const fs = require('fs') const http = require('http') const server = http.createServer() server.on('request',(request,response)=>{ const stream = fs.createReadStream('./big_file.txt') stream.pipe(response) }) server.listen(8888) console.log('started at 8888')

此时我们可以看到node.js服务的内存只增加了20多MB,虽然读取的速度不如比之前fs.readFile慢一些。

管道pipe

两个流可以用一个管道相连,stream1的末尾连上stream2开头

stream1.pipe(stream2) //链式操作 a.pipe(b).pipe(c) Stream对象的原型链 const s = fs.createReadStream(path)

s的对象层级为: 自身属性(由fs.ReadStream构造) 原型:stream.Readable.prototype 二级原型:stream.Stream.prototype 三级原型:events.EventEmitter.prototype 四级原型:Object.prototype

image.pngimage.png image.pngimage.png Stream 分类 Readable 可读 Writable 可写 Duplex 可读可写(双向,默认读写分离,互相不干扰) Transform 可读可写(变化,相当于一个转换器,比如babel写入es6,然后读取的是es5) Readable stream 默认处于paused态 添加data事件,变成flowing态 删掉data事件,变为paused态 调用pause()可以变为paused态 调用resume()可以变为flowing态 const fs = require('fs') const http = require('http') const server = http.createServer() server.on('request',(request,response)=>{ const stream = fs.createReadStream('./big_file.txt') stream.pipe(response) stream.pause() }) server.listen(8888) console.log('started-----8888')

此时我们发送请求,无法获得任何的响应,因为调用了pause()

stream.pipe(response) stream.pause() setTimeout(()=>{ console.log('3s 后恢复') stream.resume() },3000)

我们在3秒后调用resume(),此时响应恢复

创建一个readable stream const {Readable} = require('stream') const inStream = new Readable() inStream.push('ABCDEFG') inStream.push('EFGHI') inStream.push(null) inStream.pipe(process.stdout)

目前的方法不是按需供给的

const { Readable } = require("stream"); const inStream = new Readable({ read(size) { const char = String.fromCharCode(this.currentCharCode++) this.push(char); console.log(`push ${char}`) if (this.currentCharCode > 90) { this.push(null); } } }) inStream.currentCharCode = 65 inStream.pipe(process.stdout)

用户调用read才会调用

Writable stream 1.drain事件

我们在调用stream.write(chunk)的时候可能会得到false,表示写的太快,数据积压,要监听到drain事件后才能继续write

2. finish事件

调用stream.end()后,且缓冲区数据都传递给底层系统后,触发finish事件

创建一个writable stream const {Writable} = require('stream') const outStream = new Writable({ write(chunk, encoding, callback) { console.log('get user input:') console.log(chunk.toString()) callback() } }) process.stdin.pipe(outStream)

这时我们输入一个值,终端会出现你输入的值

Duplex stream const { Duplex } = require("stream"); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); } } }); inoutStream.currentCharCode = 65; process.stdin.pipe(inoutStream).pipe(process.stdout); Transform stream const { Transform } = require("stream"); const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); process.stdin.pipe(upperCaseTr).pipe(process.stdout); 创建一个gzip流 const fs = require("fs"); const zlib = require("zlib"); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + ".gz"));


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3