使用 IObservable 进行批处理

2024-01-11

我的服务器端向我发送批量消息。批次中的消息数量和频率是任意的。有时,我每隔 1 分钟就会收到一条消息,有时一小时内都没有收到消息。 1 到 10 条消息。

我当前的实现使用Observable.Buffer(TimeSpan.FromSeconds(5))将消息分组并发送给订阅者。

有没有一种方法可以配置 Observable 来表示,如果两条消息之间有 x 秒的延迟,则将缓冲的消息发送给订阅者,而不是每 5 秒检查一次。

如何避免不必要的计时器每 5 秒计时一次? (我愿意接受其他优化批处理的建议。)


使用 bufferClosingSelector 工厂方法

decPL 建议使用重载Buffer接受一个bufferClosingSelector- 在打开新缓冲区时调用的工厂函数。它产生一个流,其第一个OnNext() or OnCompleted()信号刷新当前缓冲区。 decPL 代码如下所示:

observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))

这在解决方案方面取得了相当大的进展,但也存在一些问题:

  • 在限制持续时间内持续发布消息的活动期间,服务器不会发送消息。这可能会导致大量且不经常发布的列表。
  • 源有多个订阅;如果天气冷,可能会产生意想不到的副作用。这bufferClosingSelector工厂被称为each缓冲区关闭,因此如果源很冷,它将从初始事件而不是最近的事件中进行限制。

防止无限期节流

我们需要使用额外的机制来限制缓冲区长度并防止无限期的限制。Buffer有一个重载,允许您指定最大长度,但不幸的是您不能将其与关闭选择器结合使用。

让我们调用所需的缓冲区长度限制n。回忆一下第一个OnNext关闭选择器的 足以关闭缓冲区,所以我们需要做的就是Merge带有发送计数流的节流阀OnNext after n来自源头的事件。我们可以用.Take(n).LastAsync()去做这个;采取第一个n事件,但忽略除最后一个之外的所有事件。这是 Rx 中非常有用的模式。

让源头变得“热”

为了解决该问题bufferClosingSelector工厂重新订阅源,我们需要使用通用模式.Publish().RefCount()在源上为我们提供一个仅向订阅者发送最新事件的流。这也是一个非常有用的模式,需要记住。

Solution

这是修改后的代码,其中节流持续时间与计数器合并:

var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;

// single subscription to source
var sourcePub = source.Publish().RefCount();

var output = sourcePub.Buffer(
    () => sourcePub.Throttle(throttleDuration) 
                   .Merge(sourcePub.Take(bufferSize).LastAsync()));

生产就绪代码和测试

这是一个带有测试的生产就绪实现(使用 nuget 包 rx-testing 和 nunit)。请注意调度程序的参数化以支持测试。

public static partial class ObservableExtensions
{
    public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
        this IObservable<TSource> source,
        TimeSpan maxInterval,
        int maxBufferSize,
        IScheduler scheduler)
    {
        if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
        if (maxBufferSize <= 0)
            throw new ArgumentOutOfRangeException(
                "maxBufferSize", "maxBufferSize must be positive");

        var publishedSource = source.Publish().RefCount();

        return publishedSource.Buffer(
            () => publishedSource
                .Throttle(maxInterval, scheduler)
                .Merge(publishedSource.Take(maxBufferSize).LastAsync()));
    }
}

public class BufferNearEventsTests : ReactiveTest
{
    [Test]
    public void CloseEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int> expectedBuffer = new [] {1, 2, 3};
        var expectedTime = maxInterval.Ticks + 300;

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
    }

    [Test]
    public void FarEventsAreUnbuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(1000, 1),
            OnNext(2000, 2),
            OnNext(3000, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1},
            new[] {2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            maxInterval.Ticks + 1000,
            maxInterval.Ticks + 2000,
            maxInterval.Ticks + 3000
        };  

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
            OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
    }

    [Test]
    public void UpToMaxEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 2;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1,2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            200, /* Buffer cap reached */
            maxInterval.Ticks + 300
        };

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
    }

    private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
    {
        CollectionAssert.AreEquivalent(expected, actual);
        return true;
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 IObservable 进行批处理 的相关文章

  • InvalidOperationException - 对象当前正在其他地方使用 - 红十字

    我有一个 C 桌面应用程序 其中我连续创建的一个线程从源 实际上是一台数码相机 获取图像并将其放在 GUI 中的面板 panel Image img 上 这必须是另一个线程 如它是控件的代码隐藏 该应用程序可以工作 但在某些机器上 我会在随
  • 用于代数简化和求解的 C# 库 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 网络上有很多代数求解器和简化器 例如 algebra com 上不错的代数求解器和简化器 然而 我正在
  • 未提供参数时如何指定 C# System.Commandline 行为?

    在我的控制台应用程序中 当未提供控制台参数时 将执行我指定列表 在本例中为参数 3 的任何处理程序 调用该处理程序时 布尔参数设置为 false 但对我来说 根本不调用它更有意义 如何防止这种情况发生并显示帮助文本 using System
  • 确保 StreamReader 不会挂起等待数据

    下面的代码读取从 tcp 客户端流读取的所有内容 并且在下一次迭代中它将仅位于 Read 上 我假设正在等待数据 我如何确保它不会在没有任何内容可供读取时返回 我是否必须设置低超时 并在失败时响应异常 或者有更好的办法吗 TcpClient
  • MVC 在布局代码之前执行视图代码并破坏我的脚本顺序

    我正在尝试将所有 javascript 包含内容移至页面底部 我正在将 MVC 与 Razor 一起使用 我编写了一个辅助方法来注册脚本 它按注册顺序保留脚本 并排除重复的内容 Html RegisterScript scripts som
  • 复制 std::function 的成本有多高?

    While std function是可移动的 但在某些情况下不可能或不方便 复制它会受到重大处罚吗 它是否可能取决于捕获变量的大小 如果它是使用 lambda 表达式创建的 它依赖于实现吗 std function通常被实现为值语义 小缓
  • 在 C 中匹配二进制模式

    我目前正在开发一个 C 程序 需要解析一些定制的数据结构 幸运的是我知道它们是如何构造的 但是我不确定如何在 C 中实现我的解析器 每个结构的长度都是 32 位 并且每个结构都可以通过其二进制签名来识别 举个例子 有两个我感兴趣的特定结构
  • 将 Word 文档另存为图像

    我正在使用下面的代码将 Word 文档转换为图像文件 但是图片显得太大 内容不适合 有没有办法渲染图片或将图片保存到合适的尺寸 private void btnConvert Click object sender EventArgs e
  • 是否有实用的理由使用“if (0 == p)”而不是“if (!p)”?

    我倾向于使用逻辑非运算符来编写 if 语句 if p some code 我周围的一些人倾向于使用显式比较 因此代码如下所示 if FOO p some code 其中 FOO 是其中之一false FALSE 0 0 0 NULL etc
  • 具有交替类型的可变参数模板参数包

    我想知道是否可以使用参数包捕获交替参数模式 例如 template
  • 如何检测表单的任何控件的变化?

    如何检测 C 中表单的任何控件的更改 由于我在一个表单上有许多控件 并且如果表单中的任何控件值发生更改 我需要禁用按钮 我正在寻找一些内置函数 事件处理程序 属性 并且不想为此创建自定义函数 不 我不知道任何时候都会触发任何事件any控制表
  • Qt - ubuntu中的串口名称

    我在 Ubuntu 上查找串行端口名称时遇到问题 如您所知 为了在 Windows 上读取串口 我们可以使用以下代码 serial gt setPortName com3 但是当我在 Ubuntu 上编译这段代码时 我无法使用这段代码 se
  • 使用自定义堆的类似 malloc 的函数

    如果我希望使用自定义预分配堆构造类似 malloc 的功能 那么 C 中最好的方法是什么 我的具体问题是 我有一个可映射 类似内存 的设备 已将其放入我的地址空间中 但我需要获得一种更灵活的方式来使用该内存来存储将随着时间的推移分配和释放的
  • C#:帮助理解 UML 类图中的 <>

    我目前正在做一个项目 我们必须从 UML 图编写代码 我了解 UML 类图的剖析 但我无法理解什么 lt
  • C# HashSet 只读解决方法

    这是示例代码 static class Store private static List
  • 无法接收 UDP Windows RT

    我正在为 Windows 8 RT 编写一个 Windows Store Metro Modern RT 应用程序 需要在端口 49030 上接收 UDP 数据包 但我似乎无法接收任何数据包 我已按照使用教程进行操作DatagramSock
  • 我的班级应该订阅自己的公共活动吗?

    我正在使用 C 3 0 遵循标准事件模式我有 public event EventHandler
  • 从列表中选择项目以求和

    我有一个包含数值的项目列表 我需要使用这些项目求和 我需要你的帮助来构建这样的算法 下面是一个用 C 编写的示例 描述了我的问题 int sum 21 List
  • 如何将 PostgreSql 与 EntityFramework 6.0.2 集成? [复制]

    这个问题在这里已经有答案了 我收到以下错误 实体框架提供程序类型的 实例 成员 Npgsql NpgsqlServices Npgsql 版本 2 0 14 2 文化 中性 PublicKeyToken 5d8b90d52f46fda7 没
  • 当我使用 OpenSSL1.1.0g 根据固定的 p 和 g 值创建 Diffie Hellman 密钥协议密钥时,应该执行哪些检查?

    您好 我尝试通过这段代码使用修复 p 和 g 参数来制作 Diffie Hellman Keysanswer https stackoverflow com a 54538811 4706711 include

随机推荐