具有自定义计数标准的 RxJava 缓冲区/窗口

2024-05-03

我有一个 Observable,它发出许多对象,我想使用以下方法对这些对象进行分组:window or buffer运营。但是,不是指定count用于确定窗口中应有多少对象的参数我希望能够使用自定义标准。

例如,假设可观察对象正在发出 a 的实例Message像下面这样的类。

class Message(
   val int size: Int
)

我想根据它们的消息实例来缓冲或窗口size变量不仅仅是它们的计数。例如,获取总大小最多为 5000 的消息窗口。

// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

是否有捷径可寻?


首先我必须承认,我不是 RxJava 专家。 我只是发现你的问题具有挑战性,并试图找到解决方案。

有一个window()带参数的函数boundaryIndicator。你必须创建一个Publisher/ Flowable如果达到窗口大小,则发出一个项目。

在示例中我创建了一个对象windowManager用作boundaryIndicator。在里面onNext回调我调用windowManager并给它一个打开新窗口的机会。

val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有自定义计数标准的 RxJava 缓冲区/窗口 的相关文章

  • 使用 RxJava 限制吞吐量

    我现在遇到的情况很难解释 所以我会写一个更简单的版本来解释这个问题 我有一个Observable from 它发出一系列由ArrayList文件数量 所有这些文件都应上传到服务器 为此 我有一个函数可以完成这项工作并返回一个Observab
  • 尝试在空对象引用上调用虚拟方法 -> 解析?

    我尝试使用一个按钮从一个主要活动切换到另一个活动 但是当我尝试运行该应用程序时 它在单击该应用程序后立即自行关闭 我在命令中发现错误消息 Unable to start activity ComponentInfo fr amseu mys
  • 如何强制客户端代码使用合约初始化 Kotlin 中所有必需的构建器字段?

    在 2019 年 JetBrains 开放日上 据说 Kotlin 团队研究了合约并试图实现context允许仅在某些上下文中调用函数的合约 例如函数build仅当以下情况时才允许被调用setName方法在它之前被调用过一次 Here ht
  • Retrofit/Moshi:平台类 java.util.Date 需要显式注册 JsonAdapter

    我是 Android Retrofit 和 Moshi 的新手 我正在尝试对 API 进行 POST 调用 但在序列化方面遇到问题Date 如果您发现任何其他需要纠正的地方 请指出 因为我仍在学习 谢谢 interface ApiInter
  • Jackson Kotlin - 反序列化 JsonNode

    Problem 我有字符串形式的 JSON 内容 我首先想用 Jackson 以编程方式遍历它 然后 当我有感兴趣的节点时 我想反序列化它 我尝试过的 我已使用 mapper readValue 成功反序列化字符串 但现在我想在 jsonN
  • 为什么赋值不是语句

    我有以下代码 class Presenter private var view View null fun attachView view View this view view error Assignment is not a stat
  • Firebird 和 Android JDBC 驱动程序

    火鸟有问题 我从未与 DB 合作过 服务器 firebird 1 5 上的数据库 添加库 firebird full 2 2 4到 libs 文件夹 将其添加到 Gradle implementation fileTree libs 将其添
  • 返回 RxJava 的 Completable 的方法的命名约定

    我有一个带有视图类的 Android 应用程序 Fragment Activity 观察其ViewModel The ViewModel公开方法 例如getUserName返回Observable
  • Kotlin 无法编译库

    There s this http github com theapache64 BugMailer我创建的库是为了通过电子邮件报告异常情况 它适用于 Android Java 项目 但不适用于 Android Kotlin 当我添加库的编
  • 未向 HAL 提供足够的数据,预期位置

    我在 Android Studio 中收到此错误 我只想在按下按钮时打印文本 我收到以下错误 每次按下按钮时都会出现该错误 如果我取消注释掉意图 它也可以正常工作 但是我拥有的代码越多 错误更改就越多 我实际上不确定这是真正的错误 我这么说
  • 蓝牙权限在 jetpack compose 中无法正常工作

    我在用com google accompanist accompanist permissions 0 25 1在我的项目中 我正在尝试在运行时请求蓝牙权限 我想知道用户如何知道权限被永久禁用 清单 xml
  • 在 Kotlin 中声明静态属性?

    My Java code public class Common public static ModelPengguna currentModelPengguna public class Common companion object v
  • Kotlin 支持 Java 11 吗?

    我尝试使用 Kotlin V1 2 70 Gradle V4 10 1 和 Java 11 使用 gradle 编译项目时 出现错误 未知 JVM 目标版本 11 支持的版本 1 6 1 8 Kotlin 编译器是否支持 Java 11 生
  • 如何在 Kotlin 中使用参数进行延迟初始化

    在 Kotlin 中 我可以在没有参数的情况下执行延迟初始化 如下声明 val presenter by lazy initializePresenter abstract fun initializePresenter T 但是 如果我的
  • 是否可以在图片上叠加图标

    我正在创建一个允许用户上传图片的应用程序 当图片上传成功后 我想在右上角添加一个绿色的勾号 可绘制 失败时也一样 但有一个十字 Atm 我正在使用 Glide 在屏幕上显示 URI 我怎样才能做到这一点 您可以通过调用 glide 侦听器来
  • 如何在 Android 应用程序中每天重复一个操作?

    我每天都想重复一个动作 即使应用程序未运行或设备已重新启动 重新启动 它也必须继续工作 在我的代码中 我尝试每 1 分钟显示一条 TOAST 消息 作为测试 它在模拟器中工作正常 但在真实设备上不起作用 我尝试对修复进行一些更改 正如我在一
  • 如何在 Android 中动态添加新的 Android 芯片?

    我有一个名为 Question 的类 其中包含标签的字符串数组 我试图使用 Kotlin 显示 Recyclerview 中的每个问题以及新芯片中的每个标签 这些芯片将包含在一个 ChipGroup 中 我的问题是 如何将数组的每个标签元素
  • 6:需要显示BuyFlow UI

    There is a problem when i am click on payWithGoogle Button I am implementing Google Pay in my Android Application and wh
  • 异常后如何恢复流程

    我有以下代码 val channel BroadcastChannel
  • Kotlin:使用 Picasso 从 flickr 加载图像时出现错误 503

    我的应用程序使用 Android 的 Picasso 库从 flickr 加载图像 奇怪的是 不久前将我的应用程序迁移到 Kotlin 后 它工作得很好 但现在我开始出现 随机 503 错误 我已经在 flickr 控制面板中为每个图像设置

随机推荐

  • 通过覆盖或样式设置使 ScrollViewer 的 ScrollBar 始终可见

    我试图使 ScrollViewer 的 ScrollBar 始终可见 这样它不仅在我尝试滚动文本视图时才出现 这样用户就知道还有更多内容可以查看 起初 出于某种原因 我认为我只需要更改需要画笔覆盖的颜色 但实际上 ScrollBar 正在淡
  • 用于通过 Apple 登录的自定义圆形按钮

    我遵循 Apple 的指南来实施 使用 Apple 登录 按钮 苹果在文档中表示 也可以仅使用徽标来创建 使用Apple登录 的自定义按钮 您也可以更改图像的形状以具有圆形按钮 为了能够编辑图像 它还提供插入蒙版 但我不明白我们必须为按钮或
  • 如果我后面不写“as Something”,用“Dim”来声明变量是没有用的吗?

    例如 下面两个代码是否相同 如果我之后不写 作为整数 是否没有必要使用 Dim Sub something MyNumber 10 Worksheets 1 Range A1 MyNumber End Sub and Sub somethi
  • 打开图层地图,经纬度获取地址

    我正在尝试获取带有经度和纬度的地址 城市 邮政编码 街道地址 但我不知道如何获取 我正在使用开放图层 当我单击地图的一部分时 会获取该位置的经度和纬度 有人有解决方案吗 div class map div
  • 检测wifi是否启用(无论是否连接)

    对于 GPS 跟踪应用程序来说 在打开 WIFI 的情况下记录位置信号会导致数据非常不精确或存在间隙 在开始跟踪之前 我已使用可达性查询来检测 wifi 是否可用 问题是 如果进行该查询时 wifi 已启用但未连接到网络 则表明无法通过 w
  • 如何从偏移量获取时区名称?

    我正在使用时刻时区进行时区计算 我都有一些offset来自数据库的数据 例如GMT GMT 1 GMT 2 GMT 3 GMT 4 etc 无论如何 我可以从这些数据中获取时区或时区名称 例如 America Los Angeles 吗 我
  • Capistrano 无法部署到远程服务器

    SOLUTION 解决方案是将以下内容添加到 production rb 的顶部 unshift File expand path lib ENV rvm path Add RVM s lib directory to the load p
  • 使用 Qt 在 xoverlay 之上绘制

    我希望在使用 Xoverlay 渲染的视频流之上绘制一些 UI 我正在使用 gstreamer 播放视频并使用 xoverlay 在 xvimagesink 上渲染它 我的小部件继承自 QGLWidget 我希望使用 QPainter 绘制
  • 封装的闭包与类?

    我是 JS 来自 C etc 的新手 我突然想到闭包似乎是比类更简单 更方便的处理封装的方法 这段代码似乎给出了一种处理封装的简单方法 function addProperty o var value o get function retu
  • 我的小程序需要客户端访问资源的权限的策略文件位置在哪里?

    我发现我必须编写一个策略文件来授予我的小程序权限 但我真的很困惑 我想编写一个小程序 它是一个地图查看器 我需要在运行我的小程序的客户端上保存图像图块以在本地访问图块 以获得安全地查看地图的速度和时间 这对用户有利 因此 小程序需要授予读
  • 提升灵气自定义综合属性(通过语义动作设置结构体属性的特定成员)

    假设我有一个结构体 我想用灵气解析成 它的定义如下 struct data bool export std wstring name data export false 另外 假设该结构已适应融合 如下所示 BOOST FUSION ADA
  • 如何读取从 Access 导入的 SAS 数据集(不符合 SAS 命名约定)

    我已使用 Libname 将 Access DB 导入 SAS 库名称 accdb c mydata base accdb DB 中的所有表现在都在 accdb 库中 但 Access DB 中的表名称与 SAS 数据集命名约定不匹配 我的
  • 将泛型与 Firebase snapshot.getValue() 结合使用的最佳实践

    TL DR 如何正确使用 Firebase DataSnapshot getValue 的泛型类 用例 我想使用 Firebase 为我的所有实体 其中一堆 实现一个通用远程数据源类 当监听数据更改时 我想从 datasnapshot 获取
  • Android Studio 中过时的 Kotlin 运行时警告

    下载并安装最新的 Kotlin 插件后 我有过时的 Kotlin 运行时来自 Android Studio 的警告告诉我 您在 kotlin stdlib 1 1 2 库中的 Kotlin 运行时版本是 1 1 2 而插件版本是1 1 2
  • 从 PHP/Web 应用程序打印多个标签到 Dymo LabelWriter 450 Turbo

    我希望添加使用 Dymo LabelWriter 450 Turbo 打印多个标签的功能 我已经从 Dymo 网站下载了 DYMO Label v 8 SDK dmg 但看不到任何 Javascript Web 相关的 SDK 文件或文档
  • 如何在D3中导入json数据?

    如何在D3中导入json文件 I did d3 json temp json 但是我如何在进一步的代码中访问这个数据集呢 到目前为止我已经尝试过 var data d3 json temp json 但使用 data data 在其余代码中
  • 使用 ffmpeg 从 unix 命令批量将 wav 文件转换为 16 位

    我有一个由许多子文件夹组成的文件夹 每个子文件夹都有其他子文件夹 其中包含 wav 文件 我想像这样转换所有文件 ffmpeg i BmBmGG BmBmBmBm wav acodec pcm s16le ar 44100 BmBmGG B
  • 使用 RSQLite 库时加载 MacPorts SQLite3

    我在 SQLite 一个计算乘积的聚合器 中有一个用户定义的函数 它在 R 之外工作得很好 但是我有时在 Mac 上 如果您想添加您的 SQLite3 则需要 MacPorts 版本的 SQLite3自己的功能 扩展 我可以选择 RSQLi
  • 在 Angular 中将图像 url 转换为 base64

    我正在努力尝试将给定的图像 url 转换为 base64 在我的例子中 我有一个带有图像路径的字符串 var imgUrl assets logoEmpresas empresa logoUrl 我如何直接将给定的图像网址转换为base64
  • 具有自定义计数标准的 RxJava 缓冲区/窗口

    我有一个 Observable 它发出许多对象 我想使用以下方法对这些对象进行分组 window or buffer运营 但是 不是指定count用于确定窗口中应有多少对象的参数我希望能够使用自定义标准 例如 假设可观察对象正在发出 a 的