Flux 和 Mono 中的 compose()、transform()、as()、map()

2024-01-19

最近,我决定尝试 spring 5项目反应堆.io http://projectreactor.io/docs(io.projectreactor:3.1.1)。

有谁知道使用此功能的最佳情况是什么?使用它们各自的优缺点以及应该在哪里使用它们?

好的例子会有帮助。


这里有两种截然不同的运算符:

从事以下工作的运营商Flux itself

transform and transformDeferred用于代码交互

当您定期编写运算符链并且应用程序中具有常见的运算符使用模式时,您可以使用此代码互用或为其指定更具描述性的名称transform and transformDeferred.

两者的区别在于when应用互化运算符:transform在实例化时应用它们,同时transformDeferred在订阅时应用它们(允许动态选择添加的运算符)。

看看参考文档 http://projectreactor.io/docs/core/release/reference/docs/index.html#advanced-mutualizing-operator-usage了解更多详细信息和示例。

note: transformDeferred被称为compose3.3.0之前的版本

as

这是应用一个方便的快捷方式Function对整体Flux同时保持整个代码的流畅风格。

The 主要区别在于transform*运营商是这个不强制执行特定的返回类型。这一切都是由Function您使用,例如可以用于测试StepVerifier以流畅的风格:

Flux.just("test")
    .map(String::length)
    .as(StepVerifier::create)
    //from there on we're dealing with the StepVerifier API
    .expectNext(4)
    .verifyComplete();

javadoc 中显示的示例使用此方法转换为Mono using Mono::from,这有点令人困惑,因为返回类型非常接近Flux.

请注意,此方法还可以帮助以工厂方法样式实现的外部运算符来“扩展”Flux API

Take reactor-addons MathFlux例如,并比较:

MathFlux.sumInt(Flux.range(1, 10)
                    .map(i -> i + 2)
                    .map(i -> i * 10))
        .map(isum -> "sum=" + isum);

To:

Flux.range(1, 10)
    .map(i -> i + 2)
    .map(i -> i * 10)
    .as(MathFlux::sumInt)
    .map(isum -> "sum=" + isum)

(这可以帮助您处理这样一个事实:与 Kotlin 不同,Java 没有扩展方法:))

处理经过的数据的运算符Flux

map一切都与数据有关。当源中的每个元素可用时,它将 1-1 转换函数应用于它们。

在上面的 MathFlux 示例中,map依次用于将每个原始整数加 2,然后再次将序列中的每个数字乘以 10,最后第三次生成String从每笔金额中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flux 和 Mono 中的 compose()、transform()、as()、map() 的相关文章

  • 如何正确读取 Flux 并将其转换为单个 inputStream

    我在用着WebClient和定制BodyExtractor我的 spring boot 应用程序的类 WebClient webLCient WebClient create webClient get uri url params acc
  • 在反应性香蕉中进行测试

    有没有办法对用反应性香蕉创建的网络进行单元测试 假设我已经使用一些输入事件建立了一些网络 是否可以验证事件已产生一些输出流 行为在一定数量的输入事件之后具有一定的价值 这样做有意义吗 我注意到有各种interpret 功能 但似乎无法弄清楚
  • 处理 Reactor 中的并联通量

    我已经从 iterable 创建了一个并行通量 对于每个可迭代 我都必须进行休息调用 但是在执行时 即使任何一个请求失败 所有剩余的请求也会失败 我希望所有的请求都能被执行 无论失败或成功 我目前正在使用 Flux fromIterable
  • 仅当其中一个流发生更改时,combineLatest 才会发出

    我有一个具有频繁值的流和一个具有较慢值的流 我想将它们组合起来 但仅当较慢的发出时才发出一个值 所以combineLatest不起作用 就像这样 a1 a2 b1 a2 b1 a3 a4 a5 b2 a5 b2 目前我正在这样做 有没有更干
  • 使用 slick/scala 进行流式传输

    我正在研究 scala slick 流 并试图了解它是如何工作的 这是我的测试代码 val bigdata TableQuery BigData val x db stream bigdata result transactionally
  • RxSwift:管理应用程序中的对象更新

    我非常关心如何管理对象属性的更改 假设我有一个 汽车 类 其中包含一些属性 例如 名称 日期 价格 等 在我的视图 A 中 我正在显示从 API 检索的所有汽车 在视图 B 和 C 中 我可以显示和编辑有关视图 A 中所选汽车的特定信息 假
  • 如何将 BlockHound 添​​加到 Spring Boot 应用程序以检测阻塞调用?

    如何将 BlockHound 添 加到 Spring Boot 应用程序以检测阻塞调用 我没有找到任何 Spring Boot 应用程序的示例 https github com reactor BlockHound blob master
  • Spring Reactive Webclient 的请求级背压?

    这类似于akka http请求级反压怎么做 https stackoverflow com questions 46738696 how to do akka http request level backpressure但对于 Sprin
  • 如何为流数据创建 Flux/Publisher

    我正在使用轮询方法定期获取数据 新数据可能随时到达 我想向我的客户公开一个反应式接口 因此 我想创建一个发布者 Flux 它会在新数据可用时发布新数据并通知订阅者 我怎么做 我看到的所有 Flux 示例都是针对数据已知 可用的情况 实际上
  • 如何在 Spring 5 MVC 中将 FilePart 转换为 byte[]

    我有从网络表单接收和上传文件的控制器方法 如何从 FilePart 中提取字节数组并将其保存到数据库 我可以通过使用 FilePart transferTo 将 FilePart 保存到文件中来完成此操作 但这看起来又慢又难看 有更好的方法
  • 在 Spring Webflux 中执行阻塞 JDBC 调用

    我使用 Spring Webflux 和 Spring data jpa 使用 PostgreSql 作为后端数据库 我不想在进行数据库调用时阻塞主线程 例如find and save 为了实现同样的目标 我有一个主调度程序Controll
  • 在可观察项目生成时对其进行处理

    我有一个IObservable它会生成一次性物品 并且在其生命周期内可能会生成无限数量的物品 因此 我想在每次生成新项目时处理最后一个项目 因此Using http reactivex io documentation operators
  • 反应式扩展吞掉线程池线程上调用的 OnNext() 的异常?

    我在 Net 4 5 中使用 Rx 2 当以下代码运行时 它只是静默退出 而不执行 OnCompleted 委托或显示任何错误 如果我使用Scheduler CurrentThread in ToObservable 它至少会抛出错误并终止
  • 如何在 Spring Integration 流程中访问 Flux?

    我尝试访问 Spring Integration 中的 Flux 对象 而不将流声明拆分为两个函数 我想知道如何执行以下操作 Bean public IntegrationFlow mainFlow return IntegrationFl
  • 如何在 Swift Joint 中创建自定义链?

    我正在尝试创建一个LocationManager组合的包装 我有一个发布者和一些触发发布者的函数 但是 我想将它们与自定义命令组合在一起 这是我到目前为止得到的 available OSX 10 15 iOS 13 tvOS 13 watc
  • Rx.NET 中是否有一个Subject 实现,其功能类似于BehaviourSubject,但仅在值发生更改时才发出?

    有没有Subject https learn microsoft com en us previous versions dotnet reactive extensions hh229699 v vs 103 Rx NET 中的实现在功能
  • 如何使用 rxpy/rxjs 延迟事件发射?

    我有两个事件流 一个来自电感环路 另一个来自网络摄像机 汽车将驶过环路 然后撞上相机 如果事件彼此相差在 N 毫秒内 汽车总是会首先进入循环 我想将它们组合起来 但我也希望每个流中不匹配的事件 硬件可能会失败 全部合并到单个流中 像这样的事
  • 与玻璃钢战斗

    我读过有关 FRP 的内容 非常兴奋 它看起来很棒 因此您可以编写更多高级代码 并且一切都更加可组合 等等 然后我尝试用数百个 sloc 从纯 js 到 Bacon 重写我自己的小游戏 我发现 我实际上不是编写高级纯逻辑代码 而是击败了 B
  • 如何轻松地将 Observable 转换或分配给行为主体,以便其他组件可以共享它

    我是可观察风格编程的新手 我有一个问题 我想在组件之间跨应用程序共享用户信息 并且我使用BehaviorSubject 来共享此信息 这是受到将BehaviorSubject 共享为AuthInfo 的启发 如果我可以在我的应用程序组件中共
  • Mono.defer() 是做什么的?

    我在一些 Spring webflux 代码中遇到了 Mono defer 我在文档中查找了该方法 但不明白其解释 创建一个 Mono 提供者 它将提供要订阅的目标 Mono 对于每个下游订阅者 请给我一个解释和一个例子 有没有一个地方有一

随机推荐