深入理解node的web stream模块

2023-05-16

深入理解node的web stream模块

  • 提示:需要掌握node传统的流以及事件机制
  • node环境:v16.5.0+
  • 一下内容全部以node v18.12.0实验为基础
  • 如果观看期间发现了一些不认识的api,那就是我在用node18的api,可以自行观看node官方文档,很简单的!😄

专业术语

  • 内置队列或缓存:可以理解为node根据流输入的数据,用一个链表数据结构建立的缓存,读取、写出的内容都需要经过缓存。(参考专业说法:内置队列MDN)

  • highWaterMark水平线或阈值:内置队列需要设立上限,否则会突破node的内存限制大小,从而成为一种攻击手段https://nodejs.org/docs/latest-v18.x/api/stream.html#writablewritechunk-encoding-callback

攻击手段:A给node服务发送world,让它帮A转换html文件,A扮演着发送流,也扮演着接受流;此时A只发送,但是决绝接受,一旦这个文件超过node内存限制,也就意味着这个node服务将会内存泄露,从而宕机,hack成功!

  • ⚠️highWaterMark注意点:传统stream中highWaterMark只是一个警示作用,而不是强制行为,也就是意味着超过了highWaterMark将任然可以继续往内置队列里填充数据,直到超过内存限制

核心知识点

  • node能使用的内存大小?(为什么不说web,虽然没有刻意去了解web,V8内存管理和node一致;但其他GUI渲染内存加上去绝对和node内存不一样)

这个相信大家都知道,新生代(32 位系统分配 16M 的内存空间,64 位系统翻倍 32M),老生代(64位系统下约为1.4GB,32位系统下约为0.7GB),也就是我们能用V8进行内存管理js堆内存只有1.4G;所以如果有大量缓存数据,最好的办法是移除node之外,使用redis处理;如果有1个G的文件需要给前端下载怎么办呢?流式永远是最好的解决方案,对于node,不,对于所有后台开发来说,节省内存最好的办法就是流式,流的作用就是读多少传多少,读1M数据传1M数据给前端,大大减轻了V8内存的负担

  • 为什么不将V8内存设置很大?

该方案,确实是一个解决方案;但是V8的各种垃圾回收算法同时也会降低效率(虽然底层会并发清理,但大内存空间消耗的时间一定是成正比的),本文不会对V8垃圾回收机制展开讲解,感兴趣的同学可以搜相关的只是:新生代的Scavenge算法(from-to通过空间换时间),老生代的Mark-Sweep(标记扫除)Mark-Compact(标记压缩)时间换空间做法,V8确认一个数据需要被垃圾回收而又不影响其他堆数据的使用三色标记法(增量标记、强三原色、写屏障这些来保证一个数据被回收而不影响应用正常运行)

为什么node有了传统的stream又弄出一个web stream模块

  • 此web的含义对应的是前端,而不是web TCP双工流
  • 该流行为与前端(即浏览器内的流行为、api一致)
  • MDN参考前端流:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API

readable可读流

  • 基本使用
const { ReadableStream } = require("node:stream/web");
const { setInterval, setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const { Buffer } = require("node:buffer");

const readable = new ReadableStream({
  // 开始事件
  async start(controller) {
    console.log("start.");
  },

  // 当内置队列未满时,一直读取,如果为异步则等待异步完成后再次调用
  async pull(controller) {
    await timer(100); // 500ms 读取一次
    const val = performance.now();
    controller.enqueue(val);
    console.log("队列剩余容量", controller.desiredSize);
  },

  // 取消事件 可以通过reader.cancel()方法取消流pull读取事件
  cancel(reason) {
    console.log(reason);
  },
},
{
  highWaterMark: 5, // 水平线
  // 根据返回的number大小,水平线 - size返回的大小 = 当前剩余容量(controller.desiredSize)
  size(chunk) {
    return 1;
  },
});

(async () => {
  // 消费5次
  const reader = readable.getReader(); // 默认的reader实例,允许js值(如:对象...)
  for (let index = 1; index <= 5; index++) {
    console.log(await reader.read());
  }

  // 2s后消费3次
  setTimeout(async () => {
    console.log(await reader.read());
    console.log(await reader.read());
    console.log(await reader.read());
  }, 2000);
})();

/*
// 开始事件
start.

// 这块生产消费同时在进行所以,内置队列大小没变
队列剩余容量 5
{ value: 201.76770899817348, done: false }
队列剩余容量 5
{ value: 304.59966699779034, done: false }
队列剩余容量 5
{ value: 406.3125419989228, done: false }
队列剩余容量 5
{ value: 508.1209169998765, done: false }
队列剩余容量 5
{ value: 611.398583997041, done: false }

// 一直读取中
队列剩余容量 4
队列剩余容量 3
队列剩余容量 2
队列剩余容量 1
队列剩余容量 0
// 读取完毕到达阈值(内置队列容量为0)

// 定时器2s,开始消费
{ value: 655.7073750011623, done: false }
{ value: 757.9737910032272, done: false }
{ value: 859.5705410018563, done: false }

// 消费了3个自然要读取3个
队列剩余容量 2
队列剩余容量 1
队列剩余容量 0
*/

writeable可写流

  • 将可读流与可写流连通
const { ReadableStream, WritableStream } = require("node:stream/web");
const { setInterval, setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const { Buffer } = require("node:buffer");

// 可读流
const readable = new ReadableStream(
  {
    async pull(controller) {
      await timer(500); // 500ms 读取一次
      const val = performance.now();
      controller.enqueue(val);
      console.log("队列剩余容量", controller.desiredSize);
    },
  },
  {
    highWaterMark: 5,
    size(chunk) {
      return 1;
    },
  },
);

// 可写流
const writeable = new WritableStream({
  write(chunk) {
    console.log("写入流接收到的数据", chunk);
  },
});

(async () => {
  const writer = writeable.getWriter();
  // 不使用Reader读取器消费,可以使用for await来进行消费,将读取到的数据写入到写入流里
  for await (const value of readable) {
    writer.write(value);
  }
})();

/*
队列剩余容量 5
写入流接收到的数据 539.5047909989953
队列剩余容量 5
写入流接收到的数据 1051.5886659994721
队列剩余容量 5
写入流接收到的数据 1553.0724160000682
队列剩余容量 5
写入流接收到的数据 2055.640707999468
队列剩余容量 5
写入流接收到的数据 2558.0102079994977
... // 一边生产一边消费
*/

MDNfor await of异步迭代器:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/for-await…of

ReadableStream支持异步迭代器:https://nodejs.org/docs/latest-v18.x/api/webstreams.html#async-iteration

readable结合writeable测试背压

  • 内置了背压,到达阈值生产者会停止读取,等待消费者消费结束
const { ReadableStream, WritableStream } = require("node:stream/web");
const { setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");

// 可读流
const readable = new ReadableStream(
  {
    async pull(controller) {
      await timer(100); // 100ms 读取一次
      const val = performance.now();
      controller.enqueue(val);
      console.log("队列剩余容量", controller.desiredSize);
    },
  },
  {
    highWaterMark: 5,
    size(chunk) {
      return 1;
    },
  },
);

// 可写流 1s钟读取一次
const writeable = new WritableStream({
  async write(chunk, controller) {
    await timer(1000);
    console.log("写入流接收到的数据", chunk);
  },
});

(async () => {
  // 效果:当reader读完满内置队列之后,writer只有写入完成后,reader才会继续读,强制当水平线
  const writer = writeable.getWriter();
  for await (const value of readable) {
    await writer.write(value);
  }
})();

/*
// 生产者读取的很快
队列剩余容量 5
队列剩余容量 4
队列剩余容量 3
队列剩余容量 2
队列剩余容量 1
队列剩余容量 0
// 生产者读取到达阈值,停止读取
写入流接收到的数据 141.93745799735188 // 消费成功
队列剩余容量 0 // 消费一个,读一个
写入流接收到的数据 249.89816699922085
队列剩余容量 0
写入流接收到的数据 351.6370829977095
队列剩余容量 0
写入流接收到的数据 453.2700829990208
队列剩余容量 0
...
*/

Transform双工转换流

  • 这个玩意就比较牛了,不仅是个双工流(即能读也能写如TCP网络流),还可以进行转换
const { TransformStream } = require("node:stream/web");

const transform = new TransformStream(
  {
    // 可写流写入出发转换过程
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },

    // 写入流关闭执行
    flush(controller) {
      console.log("写入流关闭!");
    },
  },
  // 可写流阈值配置
  {
    highWaterMark: 5,
    size() {
      return 1;
    },
  },
  // 可读流阈值配置
  {
    highWaterMark: 5,
    size() {
      return 1;
    },
  },
);

(async () => {
  const writer = transform.writable.getWriter();
  const reader = transform.readable.getReader();
  await writer.write("abc");
  const value = await reader.read();
  console.log(value);
  writer.close();
})();

/*
{ value: 'ABC', done: false }
写入流关闭!
*/

web stream与传统的流的区别

  • highwatermark水平线:web stream对于阈值强制要求,小于等于阈值 <= 内置队列容量将停止读取,反之继续读取

相较于传统的stream,web stream内置了背压机制

  • 如果不用pipe()管道,也无需刻意注意背压机制,web stream底层已经帮我们处理,即highwatermark为强制背压的水平线
  • 将可读流与可写流定义好后,使用管道、或者手动写入读取,这块业务部分与流内部工作解耦
  • 与前端web流式相兼容,前后端统一

官方提供了一些工具流

  • 这些工具流在我的mac node 18.12.0上不起作用,猜测原因可能是未开发完毕且这些工具流完全可以自己用Transform双工转换流来实现,无伤大雅。

参考文献

  • MDN流API文档:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API

  • Nodejs官方web stream API文档:https://nodejs.org/docs/latest-v16.x/api/webstreams.html#class-readablestream

  • 内置队列MDN:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API/Concepts#%E5%86%85%E7%BD%AE%E9%98%9F%E5%88%97%E5%92%8C%E9%98%9F%E5%88%97%E7%AD%96%E7%95%A5

  • Node通过传统流的攻击手段:https://nodejs.org/docs/latest-v18.x/api/stream.html#writablewritechunk-encoding-callback

  • MDNfor await of异步迭代器:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/for-await…of

  • ReadableStream支持异步迭代器:[https://nodejs.org/docs/latest-v18.x/api/webstreams.html#async-iteration](

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

深入理解node的web stream模块 的相关文章

  • Node.js 可读流_read用法

    我了解如何在 Node 的 new 中使用可写流Streams2库 但我不明白如何使用可读流 举个例子 一个流包装器围绕dgram module var dgram require dgram var thumbs twiddle func
  • Nodejs 中的 tail-stream 模块不打印文件的最后一条记录

    我正在使用 tail stream 从 csv 文件获取数据 并将每个 csv 记录转换为 json 格式并打印它 但是尾流不会打印文件的最后一行 而是将其保留为缓冲区 如果我更新文件 则从上一个最后一行 缓冲的最后一行 到更新的最后一行
  • 找不到模块:错误:包路径。未从包中导出

    import firebase from firebase const firebaseConfig apiKey AIzaSyBOK7x5N5UnjY4TDqndzH7l5tvdNIsWFRc authDomain todo app e3
  • 在 Windows 上以 QML 播放 RTSP 视频

    我正在尝试将 QML 中的 RTSP 流播放到视频标签中 如下所示 Repeater model 8 Video Layout fillWidth true Layout fillHeight true fillMode VideoOutp
  • 是否可以检测流是否已被客户端关闭?

    简要介绍一下情况 我有一项服务可以通过套接字接收信息并发送回复 连接不安全 我想设置另一个可以为这些连接提供 TLS 的服务 这个新服务将提供单个端口并根据提供的客户端证书分发连接 我不想使用 stunnel 有几个原因 其中之一是每个接收
  • 将您的应用程序链接到现有页面

    我搜索了又搜索 似乎找不到任何与此相关的信息 我们有一个 Facebook 页面 facebook com companyname 我们在 Facebook 上也有一个应用程序 apps facebook com companyname 我
  • 如何用 C 语言通过 HTTP 协议发送图像?

    我是一名正在做网络服务器练习的学生 我需要一些帮助 我的网络服务器在文本页面上运行良好 但是每当浏览器发送一个 GET img jpg HTTP 1 1请求 我不知道如何处理 我听说 HTTP 协议是基于文本的 那么如何在 HTTP 响应中
  • Java 中序列化的目的是什么?

    我读过很多关于序列化的文章 以及它如何如此美好和伟大 但没有一个论点足够令人信服 我想知道是否有人能真正告诉我通过序列化一个类我们真正可以实现什么 让我们先定义序列化 然后我们才能讨论它为什么如此有用 序列化只是将现有对象转换为字节数组 该
  • ASP.net获取硬件信息

    如果我创建一个 ASP net 页面 我是否能够获取当前用户的 CPUID 和 BIOS 序列号 还是出于安全原因不允许这样做 我目前有一个获取这些值的 Visual Basic net 应用程序 我只是想知道是否可以在网页上执行相同的操作
  • 从动态服务器中抓取 html 列表数据

    哈喽大家好 抱歉提出转储问题 这是我最后的手段 我发誓我尝试了无数其他 Stackoverflow 问题 不同的框架等 但这些似乎没有帮助 我有以下问题 一个网站显示一个数据列表 前面有大量的 div li span 等标签 它是一个很大的
  • 如何从网页中提取文本内容? [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我正在用java开发一个应用程序 它
  • Drupal 模板/主题资源或建议?

    我有兴趣为我正在开发的 Drupal 网站创建自定义主题 我是 Drupal 的新手 但是我在处理构建主题 CSS PHP HTML 所需的基本概念方面拥有相当多的经验 所以 我的问题是 我从哪里开始 有创建 Drupal 主题的规范指南吗
  • Android - Java - 发送 facebook 聊天消息的意图(facebook 禁用 xmpp)

    Facebook 已弃用 xmpp API 有没有办法打开意图 或将数据传递到fb 以在Android设备上发送聊天消息 设备上安装的 Facebook 和 Messenger 应用 谢谢 您需要将 uri 传递给意图 这里10000572
  • 如何修复/解决 java.lang.reflect.InitationTargetException

    我有一个关于一个特别烦人的错误的问题 我一直无法弄清楚 更不用说克服了 每当我尝试在网站上运行 Java 小程序 Applet 或 JApplet 时 都会弹出此错误 java lang reflect InvocationTargetEx
  • 流多播 - 读取一次流,但以不同的方式处理它,并使用最少的缓冲

    为了可扩展性和节省资源 最好避免将整个输入流读入内存 而是尝试将其作为流处理 一次读取小块 当您想要对数据执行一件事 例如从 Web 请求中读取数据并将其保存到文件中 时 这在 NET 中很容易实现 简单的例子 input CopyTo o
  • 我应该使用哪个命令来缩小和优化 Nodejs Express 应用程序?

    我已经准备好 Express generator sccafold 网站并需要发布它 我应该使用哪个命令来缩小文件并优化发布 另外 我应该上传哪些目录 express generator是一个基于express框架的服务端渲染框架 而不是像
  • 直接从 Javascript 代码访问 SVG 文件

    我有这个 HTML 代码 它调用我的 javascript 代码 该代码用于仪表 在 javascript 代码中 我尝试访问 SVG 文件 并修改 仪表的 指针以显示所需的值 该代码运行良好 但是 我不想在 HTML 中调用 对象 id
  • 开发工具在表达式上中断

    当给定的 Javascript 表达式为 true 时 我想暂停一切 我见过条件断点 但这对我不起作用 因为这需要首先在某处设置断点 或者首先基于某个标准 然后向其添加附加条件 相反 我想要的是能够在给定表达式为真时中断 无论在哪里 在 D
  • 节点:使用 Nodemailer 的直通流

    我正在使用officegen 生成一个Word 文档 然后计划使用Nodemailer 和Sendgrid 将其附加到电子邮件中 Officegen 输出一个流 但我更愿意将其直接传递到附件 而不是在本地保存 Word 文档然后附加它 Ge
  • 在 C# 中使用(IDisposable obj = new ...) 在流中写入代码块(例如 XML)

    我已经开始使用实现 IDisposable 的类通过 using 语句在流中写入块 这有助于保持正确的嵌套并避免丢失或错误放置开始 结束部件 基本上 构造函数写入块的开头 例如打开 XML 标签 Dispose 写入结束 例如关闭 XML

随机推荐