RXJS中异步流是如何传输的?

2024-01-09

我试图了解流是如何通过 RXjs 中的管道传输的。
我知道这不应该成为一个问题,因为这就是异步流的整个想法 - 但仍然有一些我想理解的事情。

看这段代码:

var source = Rx.Observable
    .range(1, 3)
    .flatMapLatest(function (x) {  //`switch` these days...
        return Rx.Observable.range(x*100, 2);
    });


 source.subscribe(value => console.log('I got a value ', value))

Result :

I got a value 100
I got a value 200
I got a value 300
I got a value 301

我相信(IIUC)该图是这样的:(通知已取消订阅 101,201)

----1---------2------------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------

这是问题:

问题:

是否总是保证 2 会在 (101) 之前到达?与 (201) 之前到达的 3 相同?

我的意思是 - 如果我不打算查看时间线,那么出现下图是完全合法的:

----1---------------2---------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------

Where 2到达时略有延迟,其中 101 已发出

我在这里缺少什么?这里的管道是如何工作的?


对于具有特定 RxJS 版本的特定 Observable 链,排放顺序将始终相同。

正如已经提到的,在 RxJS 4 中它使用currentThread调度程序,如下所示:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39 https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
所有调度程序(除了immediate来自 RxJS 4) 是内部使用某种类型的队列 https://github.com/Reactive-Extensions/RxJS/blob/master/src/modular/scheduler/currentthreadscheduler.js#L29所以顺序总是一样的。

事件的顺序与您在图中显示的非常相似(......或者至少我认为是这样):

  1. 1被调度并发出,因为它是队列中唯一的操作。
  2. 100已安排。此时调度程序的队列中没有更多操作,因为2尚未安排。这RangeObservable 在调用后递归地安排另一个发射onNext() https://github.com/Reactive-Extensions/RxJS/blob/master/src/modular/observable/range.js#L19。这意味着100被安排在之前2.
  3. 2已安排 https://github.com/Reactive-Extensions/RxJS/blob/master/src/modular/observable/range.js#L20.
  4. 100被发射,101已安排
  5. 2被发射,101被处置。
  6. ... 等等

请注意,这种行为在 RxJS 4 和 RxJS 5 中是不同的。

在 RxJS 5 中,大多数 Observables 和运算符默认不使用任何 Scheduler(一个明显的例外是需要处理延迟的 Observables/operator)。所以在RxJS 5 的RangeObservable不会安排任何事情 https://github.com/ReactiveX/rxjs/blob/9ec444e1a053a882d714b9cb4bc92c898152e4b7/src/observable/RangeObservable.ts#L89并立即开始循环发出值。

RxJS 5 中的相同示例将产生不同的结果:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2);
  });

source.subscribe(value => console.log('I got a value ', value));

这将打印以下内容:

I got a value  100
I got a value  101
I got a value  200
I got a value  201
I got a value  300
I got a value  301

但是,如果您添加例如delay(0)。常识表明这不应该做任何事情:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2).delay(0);
  });

source.subscribe(value => console.log('I got a value ', value));

现在只剩下内在RangeObservable被多次调度和处理,这使得它只发出最后一次的值RangeObservable:

I got a value  300
I got a value  301
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RXJS中异步流是如何传输的? 的相关文章

随机推荐

  • 为 C/FFI 库调用分配对象

    我有一个 C 库 其中有 gpio 实现 gpio type 是特定于目标的 每个 MCU 对 gpio type 都有不同的定义 库中的函数之一 void gpio init gpio type object int32 t pin 我想
  • AttributeError:模块“tensorflow.compat”没有属性“v1”Tensorflow v:1.10.0

    当我尝试训练我的模型时 我遇到了这个错误 tensorflow1 C tensorflow1 models research object detection gt python train py logtostderr train dir
  • discord.py 的颜色代码

    我发现在discord py 中更改颜色 例如嵌入颜色 有点困难和烦人 我为不同的颜色代码创建了一个类 以便在discord py 中使用 可以将其导入到主文件中 class colors default 0 teal 0x1abc9c d
  • 为什么我只显示 1 个 JButton?

    我不明白为什么只有 登录现有帐户 是唯一显示的按钮 我想要的只是显示 2 个按钮 并且即使我将其可见性设置为 true 并将其移动 使其不与 登录到现有帐户 重叠 创建帐户 也不会显示 public class HotelBookingSy
  • 如何使用java正则表达式捕获带括号的组

    我有一些像这样的字符串 a b c d 并希望使用 java 正则表达式捕获带括号的组 我以为这个简单的正则表达式 Pattern p Pattern compile Pattern DOTALL 会做这项工作 但它没有 这有什么问题吗 不
  • 将 php curl 转换为 GAE urlfetch 以用于 iTunes InApp verifyReceipt

    有人可以帮忙将此 PHP Curl 转换为 UrlFetch 吗 这用于 Apple iTunes verifyReceipt if getiTunesProductionLevel app id sandbox sandbox overr
  • Android 设备上的输入失去焦点

    我在 Android 设备上遇到输入焦点问题 我为移动设备制作了一个画布外菜单 在此菜单中我有一个搜索输入 问题是 在android上 当我点击输入时 它会聚焦 然后键盘出现 然后输入失去焦点 键盘消失 我读到我可能必须更改清单元素中的 w
  • 基于循环内背景颜色变量的条件CSS

    我意识到这是一个与此类似的问题基于背景颜色变量的条件CSS https stackoverflow com questions 21600825 conditional css based on background color varia
  • 为什么 C# 编译器从这段代码创建 PrivateImplementationDetails ?

    我发现下面的代码 public static class MimeHelper public static string GetMimeType string strFileName string retval switch System
  • 以与 python2 和 python3 兼容的方式将字节写入标准输出

    我想要一个返回文件对象的函数 用它我可以将二进制数据写入标准输出 在Python2中sys stdout就是这样一个物体 在python3中是sys stdout buffer 检索此类对象以使其适用于 python2 和 python3
  • 使用 GPUImage 实时过滤视图

    我正在尝试使用 GPUImage 来过滤视图 因为它以一种 iOS 7 样式覆盖更新 为此 我在 NSTimer 上运行以下代码 但是我的 NSLog 显示 self backgroundUIImage imageFromCurrently
  • ListView BaseAdapter 中的 EditText 奇怪行为

    我有一个 ListView 使用填充BaseAdapter 在列表视图项中有一个数字 EditText
  • Angular 2:{{object}} 有效,{{object.child}} 抛出错误

    使用 Angular v1 已有一段时间了 自从 Angular v2 推出 Beta 版以来 我一直在研究它 现在我已经有了这段代码 但无法让它工作 真的不知道为什么 不知何故 当我打印时 profileUser json 一切正常 pr
  • 将 C++ 库与 Objective-C 应用程序链接和使用

    我正在编写一个图形应用程序 使用 Objective C 作为前端 使用 C 进行图形处理和网络通信 我在苹果网站上四处阅读 寻找一种方法来链接 dylib or so将我的 C 代码添加到我的 Xcode 项目中 但似乎没有任何效果 我能
  • 有时 GET 返回 304 而不是 200

    我正在使用 Node js 和 mongodb 我似乎有时会收到 200 有时会收到未修改的 304 router get add to bag id req res next gt req session bag push req par
  • Java JTable 更新行

    我正在创建一个像这样的 JTable String colName new String ID Country Name Page titel Page URL Time Object products new Object 123 USA
  • 为什么 PhoneGap 总是显示默认的启动画面图像?

    我现在正在测试一个示例 iOS 应用程序 尽管我在 Xcode 项目目标屏幕的 摘要 选项卡 中设置了所有新的启动图像 但 PhoneGap 3 0 仍然显示其默认启动屏幕 为什么 即使闪屏是一个插件并且现在默认情况下不包含在 PhoneG
  • 使用 WebFlux 从资源中读取和解析文件的反应方式?

    我想知道从资源中读取 解析和提供文件的正确方法是什么 目前 我做了这样的事情 fun getFile request ServerRequest Mono
  • Android HttpsUrlConnection eofException

    我有一个问题 当我尝试读取任何输入时 我的 HttpsURLConnection 将抛出 EOFException 该代码适用于某些网络调用 但不适用于其他网络调用 如果我尝试从连接中读取任何内容 则会失败并出现上述错误 Example u
  • RXJS中异步流是如何传输的?

    我试图了解流是如何通过 RXjs 中的管道传输的 我知道这不应该成为一个问题 因为这就是异步流的整个想法 但仍然有一些我想理解的事情 看这段代码 var source Rx Observable range 1 3 flatMapLates