翼度科技»论坛 编程开发 JavaScript 查看内容

[NodeJS] Streams流式数据处理

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
在现代应用开发中,数据处理的效率和资源管理尤为重要。NodeJS作为一种高效的JavaScript运行时环境,通过其强大的流(Stream)功能,提供了处理大规模数据的便捷方式。流式数据处理不仅能够优化内存使用,还可以提高数据处理的实时性和效率。下文将介绍NodeJS中的流概念、流的类型以及如何利用流来进行数据传输和处理。
流的基本概念

流式数据的特点是将数据分成一个一个的chunk,每次操作只针对其中的一小部分。
因此流式数据的读写操作不需要将整个数据保存在内存中(处理完就丢掉)。
常用于视频这种包含大量数据的应用场景,也可以在时间和空间角度上更有效地处理数据:

  • 时间:从开始读到流就可以处理数据并反馈给用户了,不需要等待全部数据到达,例如:ChatGPT的回答,就是流式数据传输,一个字一个字地显示出来;
  • 空间:如上文所说,在某些场景下不需要将整个数据保存在内存中。
NodeJS提供的API

NodeJS中的node:stream模块提供了对流数据进行处理的抽象接口。
NodeJS中的所有流对象都可以监听和触发事件,都是EventEmitter的实例对象。
下面的表格列出了每一种基本流常用且重要的事件
NodeJS中有四种基本的流类型:可读流、可写流、双工流和转换流。
描述案例事件方法可读流 Readable Streams可用于读(消费)数据1. http request
2. fs read streamsdata
endpipe()
read可写流 Writable Streams可用于写(生产)数据1. http responses
2. fs write streamsdrain
finishwrite()
end()双工流 Duplex Streams可读可写net  网络套接字转换流 Transform Streams双工流,在读写的时候可修改zlib Gzip creation流式数据传输案例

简介:创建一个比较大的文本文件,使用NodeJS启动一个服务,接口分别以三种方法返回文件内容。
代码
方法一 不使用流

读取整个文件的内容之后再返回;
读取大文件的时候不推荐这样写,因为整个文件会先被完整地从磁盘读取到内存中,再返回给客户端。
  1. import fs from 'node:fs';
  2. import http from 'node:http';
  3. const server = http.createServer();
  4. server.on('request', (req, res)=>{
  5.   // CORS
  6.   res.setHeader('Access-Control-Allow-Origin', '*');
  7.   // Solution 1
  8.   fs.readFile('test.txt', (err, data)=>{
  9.     if(err)console.log(err);
  10.     res.end(data);
  11.   });
  12. });
  13. server.listen(3000, ()=>{
  14.   console.log('listening...');
  15. });
复制代码
方法二 可读流

使用可读流,优点是边读文件边返回,只有当前处理的chunk会占据内存;
  1. import fs from 'node:fs';
  2. import http from 'node:http';
  3. const server = http.createServer();
  4. server.on('request', (req, res)=>{
  5.   // CORS
  6.   res.setHeader('Access-Control-Allow-Origin', '*');
  7.   // Solution 2: Streams
  8.   const readable = fs.createReadStream('test.txt');
  9.   readable.on('data', (chunk)=>{
  10.     res.write(chunk);
  11.   });
  12.   readable.on('end', ()=>{
  13.     res.end();
  14.   });
  15.   readable.on('error', (err)=>{
  16.     console.log(err);
  17.     res.statusCode = 500;
  18.     res.end('File reading error!');
  19.   });
  20. });
  21. server.listen(3000, ()=>{
  22.   console.log('listening...');
  23. });
复制代码
backpressure

这里介绍一下流控(Flow Controll)领域中的一个名词:Backpressure(翻译为 反压/背压)。
在Node.js和其他流处理系统中,backpressure(反压/背压)是指生产者生成数据的速度超过消费者处理数据的速度时产生的一种控制机制。
当可读流(Readable Stream)读取数据的速度快于可写流(Writable Stream)写入数据的速度时,就会产生backpressure。为了防止这种情况,可读流会根据可写流的消费能力进行控制,暂停或减慢读取数据的速度。
具体机制

  • 可写流的缓冲区:可写流内部有一个缓冲区,用于暂存数据。如果这个缓冲区被填满,流会返回 false,表示消费者已经无法及时处理更多的数据。
  • 暂停和恢复:当可写流返回 false 时,可读流会暂停读取数据。只有在可写流的缓冲区有足够的空间后,可读流才会恢复读取。
  • 事件驱动:Node.js 流通过事件驱动的方式处理backpressure。当可写流的缓冲区有空间时,会触发 drain 事件,通知可读流继续读取数据。
示例代码:通过手动暂停和恢复合理利用缓冲区,避免数据丢失、内存溢出和资源耗尽。
  1. import fs from 'node:fs';
  2. const readableStream = fs.createReadStream('input.txt');
  3. const writableStream = fs.createWriteStream('output.txt');
  4. readableStream.on('data', (chunk)=>{
  5.   const canWrite = writableStream.write(chunk);
  6.   // 可写流的缓冲区空间不够了,暂停读数据(生产)
  7.   if(!canWrite){
  8.     readableStream.pause();
  9.   }
  10. });
  11. // 当可写流的缓冲区空间足够,会触发`drain`事件
  12. // 可以继续读数据
  13. writableStream.on('drain', ()=>{
  14.   readableStream.resume();
  15. });
  16. // 读取结束,停止写入
  17. readableStream.on('end', ()=>{
  18.   writableStream.end();
  19.   console.log('done.');
  20. });
复制代码
pipe

在 Node.js 中,pipe 方法提供了一种更简单和自动化的方式来处理流之间的 backpressure 问题。pipe 方法可以连接可读流和可写流,并自动处理 backpressure,无需手动暂停和恢复流。
示例代码
  1. import fs from 'node:fs';
  2. const readableStream = fs.createReadStream('input.txt');
  3. const writableStream = fs.createWriteStream('output.txt');
  4. // 统一错误处理函数
  5. function handleError(err) {
  6.   console.error('发生错误:', err);
  7. }
  8. // 使用 pipe 连接可读流和可写流,并处理错误
  9. readableStream.pipe(writableStream)
  10.   .on('error', handleError);
  11. // 处理可读流和可写流的错误
  12. readableStream.on('error', handleError);
  13. writableStream.on('error', handleError);
复制代码
语法是:
  1. readableSource.pipe(writableDestination);
复制代码
接下来回到上文的关于流式数据网络传输的案例。
方法三 pipe

使用pipe可以简化许多代码,核心代码就是
  1. readable.pipe(res);
复制代码
示例代码:
  1. import fs from 'node:fs';
  2. import http from 'node:http';
  3. const server = http.createServer();
  4. server.on('request', (req, res)=>{
  5.   // CORS
  6.   res.setHeader('Access-Control-Allow-Origin', '*');
  7.   // Solution 3: Pipe
  8.   const readable = fs.createReadStream('test.txt');
  9.   readable.pipe(res).on('error', ()=>{
  10.     res.statusCode = 500;
  11.     res.end('File reading error!');
  12.   });
  13. });
  14. server.listen(3000, ()=>{
  15.   console.log('listening...');
  16. });
复制代码
总结


  • 流(Stream)在NodeJS中的工作原理是将数据分成一个个小块进行处理,这样无需将整个数据加载到内存中,从而优化了内存使用和数据处理效率。
  • 流在NodeJS中有四种基本类型:可读流、可写流、双工流和转换流,每种类型都有其特定的应用场景和事件机制。
  • 流的应用场景主要包括视频播放、文件处理、实时数据传输等。在这些场景中,流通过边读边写、边处理边传输的方式,可以有效地提高数据处理的实时性和系统的性能。
参考

[1]
[2] 知乎 - 如何形象的描述反应式编程中的背压(Backpressure)机制?

来源:https://www.cnblogs.com/feixianxing/p/18290492/node-js-streams
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具