在 Rx 中,如何按 id 对事件进行分组并按多个时间跨度限制每个组?

2024-05-27

可以这么说,我陷入了 Rx 热潮,这个问题与我的相关here https://stackoverflow.com/questions/19425965/rx-how-to-group-by-a-key-a-complex-object-and-later-do-selectmany-without-stopp and here https://stackoverflow.com/questions/19313360/how-to-partition-groupby-a-stream-and-monitor-absence-of-elements-in-rx-within。尽管如此,也许这些对某人有帮助,因为我可以将它们视为同一主题的有用变体。

问题:如何将随机流分组int(例如,在随机间隔生成的间隔 [0, 10] 上)将对象分组并为每个组提供可变数量的缺少事件警报(由于缺乏更好的定义,有关更多背景信息,请参阅链接的帖子)。更具体地说,通过代码,如何为每组定义多个节流设置,如下所示:

var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));

如果每组 ID 缺失时间超过一秒,订阅函数将被调用。如果想为没有事件定义三个不同的值(例如,一秒、五秒和十秒)并在事件到达时全部取消,该怎么办?我能想到的是:

  • 将每个 ID 拆分为idStream分成几个合成 ID,并提供真实 ID 和合成 ID 之间的双射映射。例如本例中 ID:1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 然后定义一个选择器函数Throttle像这样Func<int, Timespan>(i => /* switch(i)...*/)然后当Subscribe将被调用,将 ID 映射回来。另请参阅链接的问题以了解更多背景信息。
  • 创建一个嵌套分组,其中 ID 被分组,然后 ID 组将根据限制值被复制/复制/分叉(我不知道正确的术语)到组中。我认为这种方法相当复杂,我不确定它是否是最好的方法。尽管如此,我肯定有兴趣看到这样的查询。

在更一般的设置中,我怀疑,这是每个组有多个处理程序的情况,尽管我还没有找到与此相关的任何内容。

作为(希望澄清)一个例子idStream推送一个 ID: 1,三个不同的计数器将启动,每个计数器等待下一个事件发生,如果没有及时检测到新的 ID 1,则发出警报。计数器 1 (C1) 等待 5 秒,计数器 2 (C2) 等待 7 秒,计数器 3 (C3) 等待 10 秒。如果在间隔 [0, 5] 秒内收到新的 ID 1,则所有计数器将使用上述值重新初始化,并且不会发送警报。如果在间隔 [0, 7) 秒内收到新的 ID,C1 报警,C2 和 C3 将重新初始化。类似地,如果在 [0, 10) 秒间隔内收到新 ID,C1 和 C2 就会触发,但 C3 会重新初始化。

也就是说,在给定某些条件的情况下,针对一个ID将存在多个“缺席警报”或一般而言所采取的操作。我不确定什么是一个好的模拟......也许将“警报灯”堆叠在塔中,以便首先是绿色,然后是黄色,最后是红色。随着 ID 缺失的时间越来越长,一种颜色将会亮起(在这种情况下,红色是最后一种)。然后当检测到一个ID时,所有的灯都会关闭。

将詹姆斯的代码修改为如下示例并保留其余部分后,我发现了Subscribe将在两个警报级别上的第一个事件发生时直接调用。

const int MaxLevels = 2;
var idAlarmStream = idStream
    .Select(i => i)
    .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
    .Subscribe(i =>
    {
        Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
    });

让我们看看这里发生了什么,如果MaxLevels可以动态提供...

.FromTicks我花了几分钟才意识到这一点。


我认为这是可行的 - 我稍后会尝试添加更完整的解释。每个警报级别都有一个定义的阈值(每个信号组)。预计这些活动的持续时间将越来越长。

基本思想是将所有先前级别的信号馈送到当前级别。第一级是信号本身的“零”级,在返回警报流之前将其过滤掉。请注意,TSignal 键需要支持值标识。

我确信还有简化的空间!

单元测试示例:

public class AlarmTests : ReactiveTest
{
    [Test]
    public void MultipleKeyMultipleSignalMultipleLevelTest()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(800);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(200, 2),
            OnNext(400, 1),
            OnNext(420, 2),
            OnNext(800, 1),
            OnNext(1000, 1),
            OnNext(1200, 1));

        Func<int, int> keySelector = i => i;
        Func<int, int, TimeSpan> thresholdSelector = (key, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(700, new Alarm<int>(1, 1)),
            OnNext(720, new Alarm<int>(2, 1)),
            OnNext(1220, new Alarm<int>(2, 2)),
            OnNext(1500, new Alarm<int>(1, 1)),
            OnNext(2000, new Alarm<int>(1, 2)));
    }

    [Test]
    public void CheckAlarmIsSuppressed()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(500);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(400, 1),
            OnNext(600, 1));

        Func<int, int> keySelector = i => i;

        Func<int, int, TimeSpan> thresholdSelector = (signal, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(900, new Alarm<int>(1, 1)),
            OnNext(1100, new Alarm<int>(1, 2)));
    }
}



public static class ObservableExtensions
{
    /// <summary>
    /// Create an alarm system that detects signal gaps of length
    /// determined by a signal key and signals alarms of increasing severity.
    /// </summary>
    /// <typeparam name="TSignal">Type of the signal</typeparam>
    /// <typeparam name="TKey">Type of the signal key used for grouping, must implement Equals correctly</typeparam>
    /// <param name="signals">Input signal stream</param>
    /// <param name="keySelector">Function to select a key from a signal for grouping</param>
    /// <param name="thresholdSelector">Function to select a threshold for a given signal key and alarm level.
    /// Should return TimeSpan.MaxValue for levels above the highest level</param>
    /// <param name="levels">Number of alarm levels</param>
    /// <param name="scheduler">Scheduler use for throttling</param>
    /// <returns>A stream of alarms each of which contains the signal and alarm level</returns>
    public static IObservable<Alarm<TSignal>> AlarmSystem<TSignal, TKey>(
        this IObservable<TSignal> signals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int levels,
        IScheduler scheduler)
    {
        var alarmSignals = signals.Select(signal => new Alarm<TSignal>(signal, 0))
                                  .Publish()
                                  .RefCount();

        for (int i = 0; i < levels; i++)
        {
            alarmSignals = alarmSignals.CreateAlarmSystemLevel(
                keySelector, thresholdSelector, i + 1, scheduler);
        }

        return alarmSignals.Where(alarm => alarm.Level != 0);

    }

    private static IObservable<Alarm<TSignal>> CreateAlarmSystemLevel<TSignal, TKey>(
        this IObservable<Alarm<TSignal>> alarmSignals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int level,
        IScheduler scheduler)
    {
        return alarmSignals
            .Where(alarmSignal => alarmSignal.Level == 0)
            .Select(alarmSignal => alarmSignal.Signal)
            .GroupByUntil(
                keySelector,
                grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
            .SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm<TSignal>(signal, level)))
            .Merge(alarmSignals);
    }
}

public class Alarm<TSignal> : IEquatable<Alarm<TSignal>>
{
    public Alarm(TSignal signal, int level)
    {
        Signal = signal;
        Level = level;
    }

    public TSignal Signal { get; private set; }
    public int Level { get; private set; }

    private static bool Equals(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        if (ReferenceEquals(x, null))
            return false;
        if (ReferenceEquals(y, null))
            return false;
        if (ReferenceEquals(x, y))
            return true;

        return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level);
    }

    // Equality implementation added to help with testing.
    public override bool Equals(object other)
    {
        return Equals(this, other as Alarm<TSignal>);
    }

    public override string ToString()
    {
        return string.Format("Signal: {0} Level: {1}", Signal, Level);
    }

    public bool Equals(Alarm<TSignal> other)
    {
        return Equals(this, other);
    }

    public static bool operator ==(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return Equals(x, y);
    }

    public static bool operator !=(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return !Equals(x, y);
    }

    public override int GetHashCode()
    {
        return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Rx 中,如何按 id 对事件进行分组并按多个时间跨度限制每个组? 的相关文章

  • 以文化中立的方式将字符串拆分为单词

    我提出了下面的方法 旨在将可变长度的文本拆分为单词数组 以进行进一步的全文索引处理 删除停止词 然后进行词干分析 结果似乎不错 但我想听听关于这种实现对于不同语言的文本的可靠性的意见 您会建议使用正则表达式来代替吗 请注意 我选择不使用 S
  • GLKit的GLKMatrix“列专业”如何?

    前提A 当谈论线性存储器中的 列主 矩阵时 列被一个接一个地指定 使得存储器中的前 4 个条目对应于矩阵中的第一列 另一方面 行主 矩阵被理解为依次指定行 以便内存中的前 4 个条目指定矩阵的第一行 A GLKMatrix4看起来像这样 u
  • 为什么两个不同的 Base64 字符串的转换会返回相等的字节数组?

    我想知道为什么从 base64 字符串转换会为不同的字符串返回相同的字节数组 const string s1 dg const string s2 dq byte a1 Convert FromBase64String s1 byte a2
  • 不支持将数据直接绑定到存储查询(DbSet、DbQuery、DbSqlQuery)

    正在编码视觉工作室2012并使用实体模型作为我的数据层 但是 当页面尝试加载时 上面提到的标题 我使用 Linq 语句的下拉控件往往会引发未处理的异常 下面是我的代码 using AdventureWorksEntities dw new
  • 类模板参数推导 - clang 和 gcc 不同

    下面的代码使用 gcc 编译 但不使用 clang 编译 https godbolt org z ttqGuL template
  • 用于登录 .NET 的堆栈跟踪

    我编写了一个 logger exceptionfactory 模块 它使用 System Diagnostics StackTrace 从调用方法及其声明类型中获取属性 但我注意到 如果我在 Visual Studio 之外以发布模式运行代
  • OleDbDataAdapter 未填充所有行

    嘿 我正在使用 DataAdapter 读取 Excel 文件并用该数据填充数据表 这是我的查询和连接字符串 private string Query SELECT FROM Sheet1 private string ConnectStr
  • 堆栈溢出:堆栈空间中重复的临时分配?

    struct MemBlock char mem 1024 MemBlock operator const MemBlock b const return MemBlock global void foo int step 0 if ste
  • 使用 WebClient 时出现 System.Net.WebException:无法创建 SSL/TLS 安全通道

    当我执行以下代码时 System Net ServicePointManager ServerCertificateValidationCallback sender certificate chain errors gt return t
  • C#中如何移动PictureBox?

    我已经使用此代码来移动图片框pictureBox MouseMove event pictureBox Location new System Drawing Point e Location 但是当我尝试执行时 图片框闪烁并且无法识别确切
  • C++ OpenSSL 导出私钥

    到目前为止 我成功地使用了 SSL 但遇到了令人困惑的障碍 我生成了 RSA 密钥对 之前使用 PEM write bio RSAPrivateKey 来导出它们 然而 手册页声称该格式已经过时 实际上它看起来与通常的 PEM 格式不同 相
  • 重载<<的返回值

    include
  • while 循环中的 scanf

    在这段代码中 scanf只工作一次 我究竟做错了什么 include
  • 控件的命名约定[重复]

    这个问题在这里已经有答案了 Microsoft 在其网站上提供了命名指南 here http msdn microsoft com en us library xzf533w0 VS 71 aspx 我还有 框架设计指南 一书 我找不到有关
  • 垃圾收集器是否在单独的进程中运行?

    垃圾收集器是否在单独的进程中启动 例如 如果我们尝试测量某段代码所花费的进程时间 并且在此期间垃圾收集器开始收集 它会在新进程上启动还是在同一进程中启动 它的工作原理如下吗 Code Process 1 gt Garbage Collect
  • 如何使用 C# / .Net 将文件列表从 AWS S3 下载到我的设备?

    我希望下载存储在 S3 中的多个图像 但目前如果我只能下载一个就足够了 我有对象路径的信息 当我运行以下代码时 出现此错误 遇到错误 消息 读取对象时 访问被拒绝 我首先做一个亚马逊S3客户端基于我的密钥和访问配置的对象连接到服务器 然后创
  • C# 成员变量继承

    我对 C 有点陌生 但我在编程方面有相当广泛的背景 我想做的事情 为游戏定义不同的 MapTiles 我已经像这样定义了 MapTile 基类 public class MapTile public Texture2D texture pu
  • 是否可以在 .NET Core 中将 gRPC 与 HTTP/1.1 结合使用?

    我有两个网络服务 gRPC 客户端和 gRPC 服务器 服务器是用 NET Core编写的 然而 客户端是托管在 IIS 8 5 上的 NET Framework 4 7 2 Web 应用程序 所以它只支持HTTP 1 1 https le
  • C# - OutOfMemoryException 在 JSON 文件上保存列表

    我正在尝试保存压力图的流数据 基本上我有一个压力矩阵定义为 double pressureMatrix new double e Data GetLength 0 e Data GetLength 1 基本上 我得到了其中之一pressur
  • 如何在文本框中插入图像

    有没有办法在文本框中插入图像 我正在开发一个聊天应用程序 我想用图标图像更改值 等 但我找不到如何在文本框中插入图像 Thanks 如果您使用 RichTextBox 进行聊天 请查看Paste http msdn microsoft co

随机推荐

  • travis-ci 安装程序使用 --github-token 发布

    我在使用带有 github 令牌的安装版本时遇到问题 我喜欢 travis ci 但我不愿意透露我的 github 密码 我需要使用令牌并且我阅读了文档 因为这应该可以通过这种方式实现 不幸的是它仍然要求输入密码 travis login
  • bin 文件夹内任何文件的任何更改是否会导致 ASP.NET Web 应用程序中的应用程序回收?

    我知道在 ASP NET Web 应用程序中 更改位于bin文件夹会导致应用程序回收 但我想知道 正如主题所暗示的那样 是否any文件更改会导致这种行为吗 此场景中是否包含简单的文本文件 那么子文件夹呢 bin文件夹 它们的内容呢 我知道我
  • 并行 Haskell - GHC GC 火花

    我有一个正在尝试并行化的程序 带有可运行代码的完整粘贴here http lpaste net 101528 我进行了分析 发现大部分时间都花在findNearest这本质上是一个简单的foldr超过一个大Data Map findNear
  • 如何使用 with open 在 pySpark 中打开存储在 HDFS 中的文件

    如何打开存储在 HDFS 中的文件 这里输入文件来自 HDFS 如果我按如下方式提供文件 我将无法打开 它将显示为找不到文件 from pyspark import SparkConf SparkContext conf SparkConf
  • 是否可以将 Vagrant 与 intelliJ 一起使用?

    假设我正在使用 Java 并使用 IntelliJ 来执行构建和部署等操作以及其他类似操作 我以前没有使用过 Vagrant 但是在运行 Vagrant 实例时是否可以继续使用 IntelliJ 进行构建和部署 是的 您可以将 IDE 与
  • 每次都在django查询数据库中过滤查询集吗?

    想象一下我有以下代码 qs Users objects all list for i in range 10 list append qs filter age i 这里过滤器被调用 10 次 它是连接到数据库 10 次还是第一次使用过滤器
  • 如何对数字进行排序? [复制]

    这个问题在这里已经有答案了 下面是代码 Is the sortNumber对数字进行排序的函数 a 和 b 是什么意思以及为什么存在 为什么sortNumber in n sort sortNumber 没有指定任何参数a and b Ja
  • 如何设置打开文件时默认展开?

    In my vimrc我已经把set foldmethod syntax启用方法折叠等 但是 我不喜欢每次打开文件时都会折叠整个文件的默认设置 有没有办法启用foldmethod 但是当我打开文件时文件是否展开了 set foldlevel
  • 在实体框架中比较日期的最佳方法

    我在实体框架的 where 子句中使用日期并收到以下错误 这是由于以下代码 var entity dbContext MyTable Where w gt w PId 3 w CreatedOn Date mydate Date First
  • 带有客户端证书的android webview

    我尝试了几天使用嵌入在应用程序中的客户端证书的Web视图 但在我看来 android sdk没有提供任何方法来做到这一点 是否有回调来拦截服务器发送的质询 有没有办法将 webview 与客户端证书一起使用并发出 https 请求 因为我也
  • django:url 标签 -> 如何使用变量作为 url_name?

    我有一个 django 视图 它声明了一个目标变量 target name of next view to call return render request template locals 我想在我的模板中使用这个目标变量 我尝试了以下
  • 从 Bigcommerce 的浏览器内存中删除注入的分析库?

    我们如何删除这个脚本注入器系统并清除内存中的函数 简报 最近 Bigcommerce 的不法分子以 监控 为幌子创建了一个分析注入器 JS 该注入器被锁定在全局变量中 他们在未经任何 OP 同意的情况下将其推广到所有 50 000 家前台商
  • 是什么让热部署成为“难题”?

    在工作中 我们经常遇到这样的问题 永久代内存不足 http www jroller com agileanswers entry preventing java s java lang例外 团队负责人认为这是 JVM 中的一个错误 与代码的
  • 将十进制转换为十六进制

    首先 这是家庭作业 我正在尝试将 5 位数字读入寄存器 bx 假定该数字不大于 65535 16 位 以下是我尝试这样做的方法 但是 当我尝试打印该号码时 我仅打印输入的最后一位数字 这让我猜测 当我向 bx 添加另一个数字时 它会覆盖以前
  • 当页面加载图像时,它是只加载一次,还是每次在标记中找到它时加载?

    当页面加载图像时 它是只加载一次 还是每次在标记中找到它时加载 那么 jquery 呢 附加一个 img 会导致它再次重新加载吗 我问这个问题是因为我有高分辨率图像 但需要在标记的许多情况下使用它 img src hello jpg img
  • qt 如何知道按钮被点击?

    我正在尝试编写一个程序 用声音进行一些操作 我的问题是我有 3 个播放按钮和 3 个标签 我希望无论我单击 播放 按钮 都应该播放按钮附近标签中名称的声音 我有一个没有任何参数的播放插槽 那么 如何分别连接到每个播放按钮和每个标签呢 实际上
  • JavaScript Uncaught ReferenceError:jQuery 未定义;未捕获的引用错误:$未定义[重复]

    这个问题在这里已经有答案了 这是我的小提琴http jsfiddle net 4vaxE 35 http jsfiddle net 4vaxE 35 它在我的小提琴中工作得很好 但是 当我将其转移到dreamweaver时 它无法工作 我在
  • 高图表的分散工具提示未显示

    我有一个散点图和条形图 我无法查看酒吧上方散点的工具提示 这是小提琴http jsfiddle net tZ9Rt http jsfiddle net tZ9Rt 我正在使用这两个系列 series type scatter index 2
  • 我可以使用 VBA 将密码“传递”到 Excel 中的外部数据库连接吗?

    我正在尝试使用 VBA 隐藏我在 Excel 工作表中设置的数据连接的密码 由于 Excel 以纯文本形式存储外部数据源的密码 因此我想让 VBA 调用表的刷新并提供密码 我录制了刷新表格并输入密码的宏 但令我沮丧的是 它似乎省略了密码部分
  • 在 Rx 中,如何按 id 对事件进行分组并按多个时间跨度限制每个组?

    可以这么说 我陷入了 Rx 热潮 这个问题与我的相关here https stackoverflow com questions 19425965 rx how to group by a key a complex object and