node.js 入门教程之十二 -- Streams 流
Streams 是驱动 node.js 程序的核心概念。它提供了处理对文件的读写,网络传输,或者其他端到端的数据交换的更加高效的方式。
streams 流并不是 node.js 首先引入的概念,unix 操作系统在很久之前就在使用了,一个程序可以通过 pipe 管道操作符 |
来传递 streams 流给其他程序。
下面的示例是在 Linux 中,通过 pipe 管道将 cat 读取的文件数据传递给 grep 进行过滤,test.txt 文件内容如下:
aaa bbb
bbb ccc
aaa ccc
$ cat test.txt | grep aaa
aaa bbb
aaa ccc
stream 优点
在传统方法中,当程序读取一个文件内容时会先将文件内容全部读取到内存中,然后再去使用它。而使用 stream 时会一段段的读取文件内容并进行处理,而不需要整体读取到内存。
node.js 的 stream 模块是构建所有 streaming API 的底层。所有的 streams 流都是 eventEmitter 的实例。每种类型的 stream 都有各自的 event 事件,在数据流变化时触发对应事件。
相比于其他数据处理方法,streams 有两个优势:
- 内存效率:在使用数据前不需要将大量数据存入内存中。
- 时间效率:能够更快的进入数据处理阶段,在获取到文件后能够很快的使用它而不需要等待它全部加载完毕。
使用方法
下面我们从一个示例来说明 stream 的使用方法。我们建立一个 http server,当收到请求时读取一个本地文件的内容并作为 response 发送给客户端。
首先传统实现方式如下:
const http = require('http')
const fs = require('fs')
const hostname = '127.0.0.1'
const port = 3000
const server = http.createServer((req, res) => {
fs.readFile(__dirname + '/test.txt', (err, data) => {
if (err) {
console.log('read error')
}
res.end(data)
})
})
server.listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`)
})
以上方式在收到请求后,通过 readFile()
读取文件内容到内存中,当成功后会调用 callback 通过 res.end(data)
发送读取的数据给客户端。如果文件较大时以上过程会花费一些时间。
__dirname 返回当前执行文件的路径。
下面介绍使用 stream 实现上面的过程:
const http = require('http')
const fs = require('fs')
const hostname = '127.0.0.1'
const port = 3000
const server = http.createServer((req, res) => {
const stream = fs.createReadStream(__dirname + '/test.txt')
stream.pipe(res)
})
server.listen(port, hostname, () => {
console.log(`Server running at http://${hostname}:${port}/`)
})
通过 fs.createReadStream()
返回一个文件的 stream。
不同于第一种,以上方式会在 stream 中只要有了 data chunk 块就立刻作为 response 数据传输给客户端。这样就会一边读取文件一边传输数据。
pipe()
以上示例中调用 stream 的 pipe()
method。它的功能是建立一个 source stream 源流到 destination stream 目标流的 pipe 管道。这样文件的 stream 流通向了 http response。
pipe()
的返回是 destination stream 目标流,这样就可以很方便的链接多个 pipe:
src.pipe(dest1).pipe(dest2)
以下写法和上面示例效果一样:
src.pipe(dest1)
dest1.pipe(dest2)
基于 stream 的 node.js API
由于 stream 的巨大优势,很多 node.js 核心模块提供了 stream 的原生支持,以下是常用的部分:
- process.stdin 返回一个链接到 stdin 的 stream
- process.stdout 返回一个链接到 stdout 的 stream
- process.stderr 返回一个链接到 stderr 的 stream
- fs.createReadStream() 创建一个文件的可读的 stream
- fs.createWriteStream() 创建一个文件的可写的 stream
- net.connect() 初始化一个基于 stream 的连接
- http.request() 返回一个 http.ClientRequest 实例,它是一个可写的 stream
streams 的类型
有四个 streams 的 calsses:
- Readable: 一个可以作为管道源头的 stream,不能作为管道的目标也就是不能写入数据
- Writable: 一个可以作为管道目标的 stream,不能作为管道源头也就是不能从中获取数据
- Duplex: 一个既可作为管道源头也可以作为目标的 stream
- Transform: 类似于 Duplex
创建 readable stream
从 stream 模块定义一个可读的 stream 然后通过定义 readable._read()
method 内容完成初始化:
const Stream = require('stream')
const readableStream = new Stream.Readable()
readableStream._read = () => {}
初始化也可以这样写:
const readableStream = new Stream.Readable({
read() {}
})
以上就创建了一个可读的 stream,可以给其传输数据:
readableStream.push('hi')
readableStream.push('hello')
可以将这个 stream 和一个可写的 stream 之间建立管道:
readableStream.pipe(process.stdout)
创建 writable stream
通过实例化一个 Writable object 然后通过定义 _write()
method 内容完成初始化:
const Stream = require('stream')
const writeableStream = new Stream.Writable()
writeableStream._write = (chunk, encoding, callback) => {
console.log(chunk.toString())
callback()
}
接收到的数据块传入 chunk,encoding 定义数据类型,callback function 是当一个 chunk 数据块传输完成后调用,此 callback 可在 writeableStream.write 内定义,一般可以是 error 处理,下面会介绍。
可以将定义的可写 stream 同一个可读 stream pipe 管道连接:
process.stdin.pipe(writeableStream)
这样就可以在 stdin 和 writeableStream 间建立 pipe,此时在 stdin 输入数据就会立刻将数据显示在终端。
从可读的 stream 读取数据
下面的示例中,我们建立一个可读的 stream 和一个 可写的 stream,并建立 pipe 管道连接:
const Stream = require('stream')
const readableStream = new Stream.Readable()
readableStream._read = () => {}
const writeableStream = new Stream.Writable()
writeableStream._write = (chunk, encoding, callback) => {
console.log(chunk.toString())
callback()
}
readableStream.pipe(writeableStream)
readableStream.push('hi\n')
readableStream.push('hello\n')
也通过 readable event 事件来处理可读 stream。当 stream 中有准备好的数据块时会触发 readable event:
readableStream.on('readable', () => {
console.log(readableStream.read().toString())
})
给可写的 stream 写入数据
使用 write method 来写入数据:
writeableStream.write('hello world\n')
可以定义写入数据的编码格式和 callback function:
writeableStream.write('hello world\n', 'utf-8', err => {console.log(err)})
实际中 callback function 是否会执行要看 stream 初始化中在 writeableStream._write
是否调用了 callback。
如果数据写入已经完成,可以使用 end method 来告诉可写的 stream:
const Stream = require('stream')
const writeableStream = new Stream.Writable()
writeableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
process.stdin.pipe(writeableStream)
writeableStream.write('hello world\n')
writeableStream.end()
以上示例中如果在最后一句不调用 end method 则 writeableStream 会持续保持接收来自 stdin 的数据状态。
创建 transform stream
和可写的 stream 创建方式类似,可读也可写:
const Stream = require('stream')
const transformStream = new Stream.Transform()
transformStream._transform = (chunk, encoding, callback) => {
console.log('transform' + chunk.toString())
transformStream.push(chunk)
callback()
}
const writeableStream = new Stream.Writable()
writeableStream._write = (chunk, encoding, callback) => {
console.log('write' + chunk.toString())
callback()
}
process.stdin.pipe(transformStream).pipe(writeableStream)
在 transformStream._transform
定义了当 transform stream 接收到数据后通过 transformStream.push
将数据发到 readable stream 中,这样其他 stream 就可以读取到它接收到的数据了。
更多 stream 使用方法参考:https://nodejs.org/api/stream.html
标签:无