RxJS——异步数据流的响应式编程库(适合新手入门)

2023-11-12

RxJS概述

RxJS 全称 Reactive Extensions for JavaScript

RxJS 结合了函数式编程、观察者模式(例如 DOM EventListener)、迭代器模式(例如 ES6 Iterater)

RxJS 官方是这样说的: Think of RxJS as Lodash for events. 把 RxJS 想像成针对 events 的 lodash

RxJS 本质是个工具库,处理的是事件,这里的 events,可以称之为流

那么流是指什么呢?举个例子,代码中每 1s 输出一个数字,用户每一次对元素的点击,就像是在时间这个维度上,产生了一个数据集。这个数据集不像数组那样,它不是一开始都存在的,而是随着时间的流逝,数据一个一个被输出。这种异步行为产生的数据,就可以被称之为一个流。在 RxJS 中,称之为 ovservalbe(抛开英文,本质其实就是一个数据的集合,只是这些数据不一定是一开始就设定好的,而是随着时间而不断产生的)

而 RxJS,就是为了处理这种流而产生的工具,比如流与流的合并、流的截断、延迟、消抖等

Redux VS RxJS

在这里插入图片描述

RxJS核心概念解析

Observable

它的本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。而其对象存在一个 subscribe 方法,调用该方法后,才会启动这个流(也就是数据才会开始产生),这里需要注意的是多次启动的每一个流都是独立的,互不干扰。

Observer

从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。

Subscribtion

它的本质就是暂存了一个启动后的流,之前提到,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在 subscription 中,提供了 unsubscribe,来停止这个流。

简单理解了这三个名词 observable, observer, subscription 后,从数据的角度来思考:

observable 定义了要生成一个什么样的数据,其 subscribe 方法,接收一个 observer(定义了接收到数据如何处理),并开始产生数据。该方法的返回值 subscription, 存储了这个已经开启的流,同时具有 unscbscribe 方法,可以将这个流停止。

整理成这个公式:

Subscription = Observable.subscribe (observer)

observable: 随着时间产生的数据集合,可以理解为流,其 subscribe 方法可以启动该流

observer: 决定如何处理数据

subscription: 存储已经启动过的流,其 unsubscribe 方法可以停止该流

Subject

它是一个代理对象,既是一个 Observable 又是一个 Observer,它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据,同时,Subject 会对内部的 observers 清单进行多播 (multicast)

Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式

在这里插入图片描述

Subject 有三个变体:

  • BehaviorSubject是一种在有新的订阅时会额外发出最近一次改变的值的 Subject,需要传入一个参数即初始值(如果没改变则发送初始值)
  • ReplaySubject会保存所有值,然后回放给新的订阅者,需要传入一个参数即初始值,用于控制重放值的数量(默认重放所有)
  • AsyncSubject只有当 Observable 执行完成时 (执行 complete()),才会将执行的最后一个值发送给观察者。如果因异常而终止,AsyncSubject 将不会释放任何数据,但是会向 Observer 传递一个异常通知。

Scheduler

用来控制并发并且是中央集权的调度员,允许在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。

调度器是执行上下文。 它表示在何时何地执行任务 (举例来说,立即的,或另一种回调函数机制 (比如 setTimeoutprocess.nextTick),或动画帧)。

调度器有一个 (虚拟的) 时钟。 调度器功能通过它的 getter 方法 now() 提供了 “时间” 的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

Scheduler调度器能做四种调度:

  • queue:将每个下一个任务放在队列中,而不是立即执行。queue 延迟使用调度程序时,其行为与 async 调度程序相同。当没有延迟使用时,它将同步安排给定的任务 - 在安排好任务后立即执行。但是,当递归调用时(即在已调度的任务内部),将使用队列调度程序调度另一个任务,而不是立即执行,该任务将被放入队列并等待当前任务完成。
  • asap:内部基于 Promise 实现(Node 端采用 process.nextTick),会使用可用的最快的异步传输机制,如果不支持 Promiseprocess.nextTick 或者 Web WorkerMessageChannel 也可能会调用 setTimeout 方式进行调度。
  • async:与 asap 方式很像,只不过内部采用 setInterval 进行调度,大多用于基于时间的操作符。
  • animationFrame:内部基于 requestAnimationFrame 来实现调度,所以执行的时机将与 window.requestAnimationFrame 保持一致,适用于需要频繁渲染或操作动画的场景。

Operators

在这里插入图片描述

RxJS操作符非常多,在此只介绍几个常用的:

  • create

    createonSubscription 函数转化为一个实际的 Observable 。每当有人订阅该 Observable 的时候,onSubscription 函数会接收 Observer 实例作为唯一参数行。onSubscription 应该调用观察者对象的 next, errorcomplete 方法。

    const source = Rx.Observable.create(((observer: any) => {
        observer.next(1);
        observer.next(2);
        setTimeout(() => {
            observer.next(3);
        }, 1000)
    }))
    
    // 方式一
    source.subscribe(
        {
            next(val) {
                console.log('A:' + val);
            }
        }
    );
    // 方式二
    source.subscribe((val) => console.log('B:' + val));
    
    // A:1
    // A:2
    // B:1
    // B:2
    //- 1s后:
    // A:3
    // B:3
    
  • from

    从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable。该方法就有点像 js 中的 Array.from 方法(可以从一个类数组或者可迭代对象创建一个新的数组),只不过在 RxJS 中是转成一个 Observable 给使用者使用。

    const source = Rx.Observable.from([10, 20, 30]);
    source.subscribe(v => console.log(v));
    
    // 10
    // 20
    // 30
    
  • of

    from 的能力差不多,只不过在使用的时候是传入一个一个参数来调用的,有点类似于 js 中的 concat 方法。同样也会返回一个 Observable,它会依次将你传入的参数合并并将数据以同步的方式发出。

    const source = Rx.Observable.of(1, 2, 3);
    source.subscribe(v => console.log(v));
    
    // 1
    // 2
    // 3
    
  • debounceTime

    功能与 debounce 防抖函数差不多,只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。

    假设一个数据源每隔一秒发送一个数,而我们使用了 debounceTime 操作符,并设置了延时时间,那么在数据源发送一个新数据之后,如果在延时时间内数据源又发送了一个新数据,这个新的数据就会被先缓存住不会发送,等待发送完数据之后并等待延时时间结束才会发送给订阅者。不仅如此,在延时时间未到的时候并且已有一个值在缓冲区,这个时候又收到一个新值,那么缓冲区就会把老的数据抛弃放入新的,然后重新等待延时时间到达然后将其发送。

    const source = Rx.Observable.interval(1000).take(3);
    const result = source.debounceTime(2000);
    result.subscribe(x => console.log(x));
    
    // 程序启动之后的前三秒没有数据打印,等到五秒到了之后,打印出一个2,接着就没有再打印了
    // 数据源会每秒依次发送三个数 0、1、2,由于我们设定了延时时间为 2 秒,那么也就是说,我们在数据发送完成之前都是不可能看到数据的,因为发送源的发送频率为 1 秒,延时时间却有两秒,也就是除非发送完,否则不可能满足发送源等待两秒再发送新数据,每次发完新数据之后要等两秒之后才会有打印,所以不论我们该数据源发送多少个数,最终订阅者收到的只有最后一个数。
    
  • take

    只发出源 Observable 最初发出的 N 个值 (N = count)

    这个操作符在前面出现了很多次了,还挺常见的,用于控制只获取特定数目的值,跟 interval 这种会持续发送数据的配合起来就能自主控制要多少个值了。

  • skip

    返回一个 Observable, 该 Observable 跳过源 Observable 发出的前 N 个值 (N = count)

    假设这个数据源发送 6 个值,可以使用 skip 操作符来跳过前多少个。

    const source = Rx.Observable.from([1, 2, 3, 2, 4, 3]);
    const result = source.skip(2);
    result.subscribe(x => console.log(x));
    
    // 打印结果为:3、2、4、3,跳过了前面两个数。
    
  • concat

    concatconcatAll效果是一样的,区别在于 concat 要传递参数,参数必须是 Observable

    concat 将多个 observable 串接起来,前一个完成好了再执行下一个。

    const source1 = interval(1000).pipe(take(3));
    const source2 = of(3);
    const source3 = of (4,5);
    const example = source1.pipe(concat(source2,source3))
    example.subscribe({
      next: value => {
        console.log(value);
      },
      error: err => {
        console.log("Error: " + err);
      },
      complete: () => {
        console.log("complete");
      }
    });
    
    // 0
    // 1
    // 2
    // 3
    // 4
    // 5
    // complete
    

热观察和冷观察

在 RxJS 中,有热观察和冷观察的概念。其中的区别:

  • Hot Observable:可以理解为现场直播,我们进场的时候只能看到即时的内容
  • Cold Observable:可以理解为点播(电影),我们打开的时候会从头播放

RxJS 中 Observable 默认为冷观察,而通过 publish()connect() 可以将冷的 Observable 转变成热的

let publisher$ = Rx.Observable.interval(1000).take(5).publish();

publisher$.subscribe(
	data => console.log('subscriber from first minute',data),
	err => console.log(err),
	() => console.log('completed')
)

setTimeout(() => {
    publisher$.subscribe(
        data => console.log('subscriber from 2nd minute', data),
        err => console.log(err),
        () => console.log('completed')
    )
}, 3000)

publisher$.connect();
// 第一个订阅者输出的是0,1,2,3,4,而第二个输出的是3,4,此处为热观察

热观察和冷观察根据具体的场景可能会有不同的需要,而 Observable 提供的缓存能力也能解决不少业务场景。

例如,如果我们想要在拉群后,自动同步之前的聊天记录,通过冷观察就可以做到。

merge/combine合流

一般来说,合流有两种方式:

// merge
--1----2-----3--------4---
----a-----b----c---d------
           merge
--1-a--2--b--3-c---d--4---

// combine
--1----2-----3--------4---
----a-----b-----c--d------
         combine
--1a-2a-2b-3b-3c-3d-4d--

merge 的合流方式可以用在聊天室、多人协作、公众号订阅就可以通过这样的方式合流,最终按照顺序地展示出对应的操作记录。

在 Excel 中,通过函数计算了 A1 和 B2 两个格子的相加。这种情况下可以使用 combine 合流:

const streamA1 = Rx.Observable.fromEvent(inputA1, "input"); // 监听 A1 单元格的 input 事件
const streamB2 = Rx.Observable.fromEvent(inputB2, "input"); // 监听 B2 单元格的 input 事件

const subscribe = combineLatest(streamA1, streamB2).subscribe((valueA1, valueB2) => {
	// 从 streamA1 和 streamB2 中获取最新发出的值
    return valueA1 + valueB2;
});
// 获取函数计算结果
observable.subscribe((x) => console.log(x));

在一个较大型的前端应用中,通常会拆分成渲染层、数据层、网络层、其他服务等多个功能模块。

虽然服务按照功能结构进行拆分了,但依然会存在服务间调用导致依赖关系复杂、事件触发和监听满天飞等情况。这种情况下,只能通过全局搜索关键字来找到上下游数据流、信息流,通过节点和关键字搜索才能大概理清楚某个数据来源哪里。

如果使用了响应式编程,我们可以通过各种合流的方式、订阅分流的方式,来将应用中的数据流动串在一起。这样,我们可以很清晰地当前节点上的数据来自于哪里,是用户的操作还是来自网络请求。

RXJS6 的变化

RXJS6 改变了包的结构,主要变化在 import 方式和 operator 以及使用 pipe ()

Imports 方式改变
在这里插入图片描述

从 rxjs 中类似像导入 observable subject 等的不再进一步导入,而是止于 rxjs, rxjs6 在包的结构上进行了改变

operator 的改变

在这里插入图片描述

总而言之: 类似于创建之类的用的 API 都是从 rxjs 引入的,类似于 map 之类的操作都是从 rxjs/operators 引入的

在这里插入图片描述

pipeable observable

在这里插入图片描述

被重新命名的 API

在这里插入图片描述

在这里只是简单介绍 RxJS ,帮助新手快速了解 RxJS ,更详细的 RxJS 学习可以查看官方文档

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJS——异步数据流的响应式编程库(适合新手入门) 的相关文章

  • Angular:DOM更新后调用方法

    我正在从 html 调用一个方法 调用休息服务 来增加 减少屏幕上的计数 现在我想调用另一个方法 即 getThreshold 来检查计数是否达到阈值 如果是 我想显示一条确认消息 我想首先更新屏幕上的计数 然后调用该函数来检查它是否达到阈
  • 更改 ag-grid 上的页面和缓存块大小会导致项目无限加载

    我希望使用 ag grid 的 服务器端 模式重新获取每个页面的数据 为了做到这一点 我将 maxBlocksInCache 1 和 cacheBlockSize 设置为等于每页的项目数 到这里为止一切正常 现在 当我更改每页的项目数时 网
  • 在 Angular 中显示 formControlName 的验证消息

    我有以下 Angular 注册表单
  • 从 Angular 2 动态表单的 API 设置值

    我正在尝试开始在 Angular 2 中创建动态表单 并且我正在使用 Angular 食谱中的设置here https angular io docs ts latest cookbook dynamic form html作为我的起点 我
  • Angular2 中 Http 的 Promise 与 Observable? [复制]

    这个问题在这里已经有答案了 本质上 正如标题所说 是否有任何理由使用可观察的承诺 https stackoverflow com questions 37364973 angular 2 promise vs observable为了进行
  • 如何将 zingchart 实现到 Angular2 中

    我有一个现有的项目 我想在其上实施 zingcharts 我尝试了 3 个不同的教程 主要来自 https blog zingchart com 2016 07 19 zingchart and angular 2 charts back
  • Angular 2\4 哈希 url 保留index.html

    背景 Angular 4 ng cli RouterModule useHash true 当我使用浏览到我的应用程序时http server index html它决定http server url 中省略了index html 此外 每
  • Angular 中的文件输入事件类型

    所以我已经使用 Angular 和 Typescript 很长时间了 我似乎无法找出输入文件的类型是什么 例如
  • Angular2通用部署到apache远程服务器

    我在将 Web 应用程序部署到 Apache 远程服务器时遇到问题 我已经通过以下步骤部署了一个使用 RESTful API 服务 用 PHP 编写 托管在 public html api 的标准 Angular2 Web 应用程序 在项目
  • Angular 2 runOutsideAngular 仍然改变 UI

    从我的理解来看runOutsideAngular https angular io docs ts latest api core index NgZone class html runOutsideAngular anchor 如果我需要
  • 如何在 Angular @Input 中仅接受预定义值

    我的问题是 我在 a 中收到一个字符串值作为组件的参数 但我想限制可以用作参数的值 就像enum I use Input type string 但是在组件中 一切都可以引入type正如我之前所说 我需要将其限制为 3 个选项 例如Enum
  • 如何通过自定义指令动态添加组件

    我想编写一个自定义指令 它将根据 div 内的某些逻辑动态添加组件 我在其中使用了自定义指令 我尝试使用 componentFactoryResolver 和 viewContainerRef createComponent 动态添加组件
  • 如何将 Laravel 5.4 与 Angular 4 集成

    我知道如何创造完整的拉拉维尔 5 4自己的项目 我也知道如何使用创建 SPA角4 Problem 我不知道如何将 Laravel 与 Angular 集成 另外 我想使用 Laravel 5 4 作为后端 使用 Angular 4 作为前端
  • D3、TS 和 Angular 2

    我正在尝试将 D3 v4 与 Angular 2 Typescript 一起使用 我目前正在研究 D3 v4 我能够遵循 stackoverflow 中类似问题的一些答案 但没有成功 我已经导入了大部分 D3 库及其类型 我使用的是 TS
  • 在指令中动态添加 *ngIf

    如何动态地将 ngIf 添加到用属性指令修饰的元素 为了一个简单的实验 我尝试了这个 Directive selector lhUserHasRights export class UserHasRightsDirective implem
  • Karma 单元测试 / STORE - 状态未定义

    运行应用程序时一切正常 但在帐户单元测试中似乎没有启动或我的状态已启动 我有什么明显做错的事情吗 这是错误 Test error index js 中的创建选择器返回一个带有未定义参数的函数 但仅在 karma 测试期间返回 账户 comp
  • Angular7:NullInjectorError:没有 FormGroup 的提供者

    我真的很沮丧 因为我不知道发生了什么 今天早上一切正常 就在我进行一些更改以将 ReactiveForm 中的 2 个表单合并在一起之前 现在我在浏览器中收到以下错误 错误 StaticInjectorError AppModule For
  • 在 Angular 5 中,如何从父组件访问动态添加的子组件?

    我正在开发一个 Ionic Angular 5 项目 我需要动态加载一些组件 继动态组件加载器示例 https angular io guide dynamic component loader在 Angular 文档中 我能够成功加载组件
  • 为什么我不能在 Angular 模板中定义内联函数?还能怎样做呢?

    我正在使用 Angular 2 而且是新手 我想为单击按钮调用一个小函数 所以我尝试这样做 也许是因为我来自 React 背景
  • 将 UMD Javascript 模块导入浏览器

    你好 我正在对 RxJS 进行一些研究 我可以通过在浏览器中引用它来使用该库 如下所示 它使用全局对象命名空间变量 Rx 导入 我可以制作可观察的东西并做所有有趣的事情 当我将 src 更改为指向最新的 UMD 文件时 一切都会崩溃 如下所

随机推荐

  • linear-gradient为啥只能background不能background-color

    background 可以设置 背景颜色 背景图片 定位等 而background color 只能设置 背景颜色 设置background color aaa 此时仅仅改专变了背景色 但此时有一个默认的的background repeat
  • 什么是OAuth

    什么是OAuth OAuth 全称 Open Authorization 中文翻译开放授权 是一种基于令牌的身份验证 允许组织跨第三方服务共享信息 而无需公开用户的用户名 密码 本质上 OAuth是为第三方服务提供令牌的中间人 该令牌只允许
  • android按日期函数查询,Android开发中SQLite存储时间和按日期函数查询

    Android开发中SQLite存储时间和按日期函数查询 Android开发中SQLite存储时间和按日期函数查询 在Android开发中 在消息模块中 消息做数据库缓存处理 排序查询或指定时间查询需要用时间和日期函数 本篇简括 存储时间字
  • QT 完美实现圆形按钮

    QT 版本 5 6 0 官方的按钮有些普通 如果我们想要换成自己喜欢的按钮而却无从下手 那么请继续往下阅读 皮一下 首先 可以在网络上搜索一下自己喜欢的按钮图形 或者可以自行绘制 我以下面的图形为例 开始制作 一 建立 QT 工程 并加入图
  • 基于画布canvas进行图片压缩

    Canvas 压缩图片的原理主要是通过重新绘制图片 调整图片质量或大小来达到压缩图片的目的 具体实现步骤如下 1 使用 JavaScript 中的 Image 对象将图片加载到内存中 var img new Image img src im
  • mkp勒索病毒怎么处理

    目录 前言 简介 一 mkp勒索病毒的特征 二 mkp后缀勒索病毒是如何传播的 三 如何预防与处理mkp勒索病毒攻击 前言 简介 当今 勒索病毒已成为企业网络安全的一大威胁 而其中mkp勒索病毒则是一种新近出现的变种 与其他勒索病毒一样 m
  • 关于redis密码

    如何更改密码 直接配置文件里更改 配置文件里开放 requirepass 之后客户端更改 用Redis命令查询密码 可以使用以下Redis命令来查询密码 config get requirepass 得到的结果第一行固定是requirepa
  • 设计模式-备忘录模式(Memento Pattern)

    文章目录 前言 一 备忘录模式的概念 二 备忘录模式的实现 三 备忘录优缺点 优点 缺点 总结 前言 备忘录模式 Memento Pattern 是一种行为型设计模式 它用于捕获和存储对象的内部状态 以便在以后可以恢复到先前的状态 备忘录模
  • pthread_detach函数

    int pthread detach pthread t thread 成功 0 失败 错误号 作用 从状态上实现线程分离 注意不是指该线程独自占用地址空间 线程分离状态 指定该状态 线程主动与主控线程断开关系 线程结束后 不会产生僵尸线程
  • jwt编码解码

    import jwt 创建 JWT payload user id 1234 secret key your secret key 密钥 用于签名和验证 algorithm HS256 签名算法 token jwt encode paylo
  • NPM导入模块报错

    npm WARN enoent ENOENT no such file or directory open C Program Files nodejs package json 找不到package json文件 一般情况下npm安装时都
  • python 删除文件、清空目录的方法总结

    Python os remove 方法 os remove 方法用于删除指定路径的文件 如果指定的路径是一个目录 将抛出OSError 在Unix Windows中有效 以下实例演示了 remove 方法的使用 usr bin python
  • python---面向对象(一)

    类和对象 面向对象编程的2个非常重要的概念 类和对象 对象是面向对象编程的核心 在使用对象的过程中 为了将具有共同特征和行为的一组对象抽象定义 提出了另外一个新的概念 类 类就相当于制造飞机时的图纸 用它来进行创建的飞机就相当于对象 类是抽
  • Visual Prompt

    始于NLP 简单来讲 Prompt就是对原来的输入文本进行一定的处理 使得在不改变预训练模型参数的情况下 相应任务的性能变高 例如 原输入文本为 I received the offer from ETH 对于文本分类 我们将其修改为I r
  • cassandra ssdb mongodb

    IM系统 数据量大了mongodb性能有瓶颈 cassandra ssdb 配合使用来搞IM 写扩散 其实是双写 历史消息走cassandra ssdb保留7天的离线消息 cassandra ssdb mongodb
  • 简单的解压缩算法(华为od考试)

    题目描述 现需要实现一种算法 能将一组压缩字符串还原成原始字符串 还原规则如下 1 字符后面加数字N 表示重复字符N次 例如 压缩内容为A3 表示原始字符串为AAA 2 花括号中的字符串加数字N 表示花括号中的字符重复N次 例如压缩内容为
  • 12面魔方公式图解法_【高级篇】(三)三阶魔方CFOP高级玩法之——F2L

    一 F2L这一步要干什么 1 先了解一下 棱角对 和 槽位 的概念 棱角对 即由一个棱块和一个角块构成 是F2L的基本单元 共四组 槽位 给 棱角对 预留的位置 即 棱角对 最后需要插入的地方 棱角对 和 槽位 的概念 2 棱角对 是如何插
  • 在 Android Studio 2.2 中愉快地使用 C/C++

    使用 Android studio 你可以将 C 和 C 代码编译成 native library 然后打包到你的 APK 中 你的 Java 代码可以通过 Java Native Interface JNI 调用 native libra
  • Lite Git (II) - Initialize

    Lite Git II Initialize 前言 本专栏名为Lite Git 主要想与Pro Git对应 后者为Git官方指南 有兴趣 或者想了解更多细节的同学 请移步官网下载PDF版 本专栏主要为了让初出茅庐的同学更快 更合理地掌握Gi
  • RxJS——异步数据流的响应式编程库(适合新手入门)

    文章目录 RxJS概述 Redux VS RxJS RxJS核心概念解析 热观察和冷观察 merge combine合流 RXJS6 的变化 RxJS概述 RxJS 全称 Reactive Extensions for JavaScript