TPL数据流处理N条最新消息

2023-12-07

我正在尝试创建某种队列来处理收到的 N 个最新消息。现在我有这个:

private static void SetupMessaging()
{
    _messagingBroadcastBlock = new BroadcastBlock<string>(msg => msg, new ExecutionDataflowBlockOptions
    {
        //BoundedCapacity = 1,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1
    });

    _messagingActionBlock = new ActionBlock<string>(msg =>
    {
        Console.WriteLine(msg);
        Thread.Sleep(5000);
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 2,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1    
    });

    _messagingBroadcastBlock.LinkTo(_messagingActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
    _messagingBroadcastBlock.LinkTo(DataflowBlock.NullTarget<string>());
}

问题是,如果我向其发布 1,2,3,4,5,我将得到 1,2,5,但我希望它是 1,4,5。欢迎任何建议。
UPD 1
我能够使以下解决方案发挥作用

class FixedCapacityActionBlock<T>
{
    private readonly ActionBlock<CancellableMessage<T>> _actionBlock;

    private readonly ConcurrentQueue<CancellableMessage<T>> _inputCollection = new ConcurrentQueue<CancellableMessage<T>>();

    private readonly int _maxQueueSize;

    private readonly object _syncRoot = new object();

    public FixedCapacityActionBlock(Action<T> act, ExecutionDataflowBlockOptions opt)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            EnsureOrdered = opt.EnsureOrdered,
            CancellationToken = opt.CancellationToken,
            MaxDegreeOfParallelism = opt.MaxDegreeOfParallelism,
            MaxMessagesPerTask = opt.MaxMessagesPerTask,
            NameFormat = opt.NameFormat,
            SingleProducerConstrained = opt.SingleProducerConstrained,
            TaskScheduler = opt.TaskScheduler,
            //we intentionally ignore this value
            //BoundedCapacity = opt.BoundedCapacity
        };
        _actionBlock = new ActionBlock<CancellableMessage<T>>(cmsg =>
        {
            if (cmsg.CancellationTokenSource.IsCancellationRequested)
            {
                return;
            }

            act(cmsg.Message);
        }, options);

        _maxQueueSize = opt.BoundedCapacity;
    }

    public bool Post(T msg)
    {
        var fullMsg = new CancellableMessage<T>(msg);

        //what if next task starts here?
        lock (_syncRoot)
        {
            _inputCollection.Enqueue(fullMsg);

            var itemsToDrop = _inputCollection.Skip(1).Except(_inputCollection.Skip(_inputCollection.Count - _maxQueueSize + 1));

            foreach (var item in itemsToDrop)
            {
                item.CancellationTokenSource.Cancel();
                CancellableMessage<T> temp;
                _inputCollection.TryDequeue(out temp);
            }

            return _actionBlock.Post(fullMsg);
        }
    }
}

And

class CancellableMessage<T> : IDisposable
{
    public CancellationTokenSource CancellationTokenSource { get; set; }

    public T Message { get; set; }

    public CancellableMessage(T msg)
    {
        CancellationTokenSource = new CancellationTokenSource();
        Message = msg;
    }

    public void Dispose()
    {
        CancellationTokenSource?.Dispose();
    }
}

虽然这有效并且实际上完成了工作,但是这个实现看起来很脏,而且也可能不是线程安全的。


这里有一个TransformBlock and ActionBlock每当收到较新的消息并且接收到新消息时,就会删除队列中最旧的消息BoundedCapacity已达到限制。它的行为非常类似于Channel配置有BoundedChannelFullMode.DropOldest.

public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldest<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var boundedCapacity = dataflowBlockOptions.BoundedCapacity;
    var cancellationToken = dataflowBlockOptions.CancellationToken;

    var queue = new Queue<TInput>(Math.Max(0, boundedCapacity));

    var outputBlock = new BufferBlock<TOutput>(new DataflowBlockOptions()
    {
        BoundedCapacity = boundedCapacity,
        CancellationToken = cancellationToken
    });

    if (boundedCapacity != DataflowBlockOptions.Unbounded)
        dataflowBlockOptions.BoundedCapacity = checked(boundedCapacity * 2);
    // After testing, at least boundedCapacity + 1 is required.
    // Make it double to be sure that all non-dropped messages will be processed.
    var transformBlock = new ActionBlock<object>(async _ =>
    {
        TInput item;
        lock (queue)
        {
            if (queue.Count == 0) return;
            item = queue.Dequeue();
        }
        var result = await transform(item).ConfigureAwait(false);
        await outputBlock.SendAsync(result, cancellationToken).ConfigureAwait(false);
    }, dataflowBlockOptions);
    dataflowBlockOptions.BoundedCapacity = boundedCapacity; // Restore initial value

    var inputBlock = new ActionBlock<TInput>(item =>
    {
        var droppedEntry = (Exists: false, Item: (TInput)default);
        lock (queue)
        {
            transformBlock.Post(null);
            if (queue.Count == boundedCapacity) droppedEntry = (true, queue.Dequeue());
            queue.Enqueue(item);
        }
        if (droppedEntry.Exists) droppedMessages?.Report(droppedEntry.Item);
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = cancellationToken
    });

    PropagateCompletion(inputBlock, transformBlock);
    PropagateFailure(transformBlock, inputBlock);
    PropagateCompletion(transformBlock, outputBlock);
    _ = transformBlock.Completion.ContinueWith(_ => { lock (queue) queue.Clear(); },
        TaskScheduler.Default);

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (exception != null) target.Fault(exception); else target.Complete();
    }
    async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
    }
}

// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldest<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    return CreateTransformBlockDropOldest(item => Task.FromResult(transform(item)),
        dataflowBlockOptions, droppedMessages);
}

// ActionBlock equivalent
public static ITargetBlock<TInput>
    CreateActionBlockDropOldest<TInput>(
    Func<TInput, Task> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateTransformBlockDropOldest<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        dataflowBlockOptions, droppedMessages);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateActionBlockDropOldest<TInput>(
    Action<TInput> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    return CreateActionBlockDropOldest(
        item => { action(item); return Task.CompletedTask; },
        dataflowBlockOptions, droppedMessages);
}

这个想法是将排队的项目存储在辅助中Queue,并将虚拟(空)值传递给内部ActionBlock<object>。该块忽略作为参数传递的项目,并从队列中获取一个项目(如果有)。 αlock用于确保队列中所有未丢弃的项目最终都会被处理(当然除非发生异常)。

还有一个额外的功能。可选的IProgress<TInput> droppedMessages参数允许在每次删除消息时接收通知。

使用示例:

_messagingActionBlock = CreateActionBlockDropOldest<string>(msg =>
{
    Console.WriteLine($"Processing: {msg}");
    Thread.Sleep(5000);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 2,
}, new Progress<string>(msg =>
{
    Console.WriteLine($"Message dropped: {msg}");
}));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TPL数据流处理N条最新消息 的相关文章

  • 在 C 中匹配二进制模式

    我目前正在开发一个 C 程序 需要解析一些定制的数据结构 幸运的是我知道它们是如何构造的 但是我不确定如何在 C 中实现我的解析器 每个结构的长度都是 32 位 并且每个结构都可以通过其二进制签名来识别 举个例子 有两个我感兴趣的特定结构
  • 复制目录内容

    我想将目录 tmp1 的内容复制到另一个目录 tmp2 tmp1 可能包含文件和其他目录 我想使用C C 复制tmp1的内容 包括模式 如果 tmp1 包含目录树 我想递归复制它们 最简单的解决方案是什么 我找到了一个解决方案来打开目录并读
  • 使用 Newtonsoft 和 C# 反序列化嵌套 JSON

    我正在尝试解析来自 Rest API 的 Json 响应 我可以获得很好的响应并创建了一些类模型 我正在使用 Newtonsoft 的 Json Net 我的响应中不断收到空值 并且不确定我的模型设置是否正确或缺少某些内容 例如 我想要获取
  • 如何创建包含 IPv4 地址的文本框? [复制]

    这个问题在这里已经有答案了 如何制作一个这样的文本框 我想所有的用户都见过这个并且知道它的功能 您可以使用带有 Mask 的 MaskedTestBox000 000 000 000 欲了解更多信息 请参阅文档 http msdn micr
  • 如何区分用户点击链接和页面自动重定向?

    拥有 C WebBrowser control http msdn microsoft com en us library system windows forms webbrowser aspx在我的 WinForms 应用程序中 并意识
  • java.io.Serialized 在 C/C++ 中的等价物是什么?

    C C 的等价物是什么java io Serialized https docs oracle com javase 7 docs api java io Serializable html 有对序列化库的引用 用 C 序列化数据结构 ht
  • 如何使用 LINQ2SQL 连接两个不同上下文的表?

    我的应用程序中有 2 个数据上下文 不同的数据库 并且需要能够通过上下文 B 中的表的右连接来查询上下文 A 中的表 我该如何在 LINQ2SQL 中执行此操作 Why 我们正在使用 SaaS 产品来跟踪我们的时间 项目等 并希望向该产品发
  • 具有交替类型的可变参数模板参数包

    我想知道是否可以使用参数包捕获交替参数模式 例如 template
  • 使用自定义堆的类似 malloc 的函数

    如果我希望使用自定义预分配堆构造类似 malloc 的功能 那么 C 中最好的方法是什么 我的具体问题是 我有一个可映射 类似内存 的设备 已将其放入我的地址空间中 但我需要获得一种更灵活的方式来使用该内存来存储将随着时间的推移分配和释放的
  • Azure 辅助角色“请求输入之一超出范围”的内部异常。

    我在辅助角色中调用 CloudTableClient CreateTableIfNotExist 方法 但收到一个异常 其中包含 请求输入之一超出范围 的内部异常 我做了一些研究 发现这是由于将表命名为非法表名引起的 但是 我尝试为我的表命
  • 为什么 std::strstream 被弃用?

    我最近发现std strstream已被弃用 取而代之的是std stringstream 我已经有一段时间没有使用它了 但它做了我当时需要做的事情 所以很惊讶听到它的弃用 我的问题是为什么做出这个决定 有什么好处std stringstr
  • CMake 无法确定目标的链接器语言

    首先 我查看了this https stackoverflow com questions 11801186 cmake unable to determine linker language with c发帖并找不到解决我的问题的方法 我
  • AES 128 CBC 蒙特卡罗测试

    我正在 AES 128 CBC 上执行 MCT 如中所述http csrc nist gov groups STM cavp documents aes AESAVS pdf http csrc nist gov groups STM ca
  • 如何设置 log4net 每天将我的文件记录到不同的文件夹中?

    我想将每天的所有日志保存在名为 YYYYMMdd 的文件夹中 log4net 应该根据系统日期时间处理创建新文件夹 我如何设置它 我想将一天中的所有日志保存到 n 个 1MB 的文件中 我不想重写旧文件 但想真正拥有一天中的所有日志 我该如
  • Cmake 链接共享库:包含库中的头文件时“没有这样的文件或目录”

    我正在学习使用 CMake 构建库 构建库的代码结构如下 include Test hpp ITest hpp interface src Test cpp ITest cpp 在 CMakeLists txt 中 我用来构建库的句子是 f
  • 使用 %d 打印 unsigned long long

    为什么我打印以下内容时得到 1 unsigned long long int largestIntegerInC 18446744073709551615LL printf largestIntegerInC d n largestInte
  • 使用 C# 读取 Soap 消息

  • C++ 函数重载类似转换

    我收到一个错误 指出两个重载具有相似的转换 我尝试了太多的事情 但没有任何帮助 这是那段代码 CString GetInput int numberOfInput BOOL clearBuffer FALSE UINT timeout IN
  • 从 Excel 应用程序对象中查找位数(32 位/64 位)?

    是否可以从 Microsoft Office Interop Excel ApplicationClass 确定 Excel 是以 32 位还是 64 位运行 Edit该解决方案应该适用于 Excel 2010 和 Excel 2007 此
  • 如何部署“SQL Server Express + EF”应用程序

    这是我第一次部署使用 SQL Server Express 数据库的应用程序 我首先使用实体 框架模型来联系数据库 我使用 Install Shield 创建了一个安装向导来安装应用程序 这些是我在目标计算机中安装应用程序所执行的步骤 安装

随机推荐

  • 如何在列出另一个文件之前清除列表文件?

    function listFiles var x document getElementById ResultShown value var date new Date date setDate date getDate 180 var n
  • 如何在集成测试中测试Mongo索引?

    我有一个 Java 方法 它在 Mongo 集合中的两个字段上创建索引 我应该获取集合的索引信息 然后检查索引的名称和字段是否正确 为此编写集成测试最简洁的方法是什么 使用自定义 Hamcrest 匹配器来查看索引是否在集合中是否有意义 在
  • 如何从命令行运行路径中包含空格的 PowerShell 脚本?

    因此 我尝试了多种不同的方法来从命令行运行 PowerShell 脚本 但每种方法都会返回错误 这是这条路径 C Users test Documents test line space PS Script test ps1 我已经尝试过这
  • 带有 Influxdb 的 Grafana 世界地图面板不显示点

    我安装了 Grafana 的世界地图面板 但无法在世界地图面板上显示点 我在看Grafana 世界地图 表数据源和其他链接但没有找到答案 我尝试使用country json 添加截屏我的声誉没有 10 所以我无法添加超过 2 个链接 我也尝
  • 实际上从堆中为对象分配了多少内存?

    我有一个程序使用太多内存来在堆上分配大量小对象 所以我想研究一下优化它的方法 该程序使用Visual C 7编译 有没有办法确定为给定对象实际分配了多少内存 我的意思是当我打电话时new堆分配不小于必要的数量 我怎样才能知道到底分配了多少
  • 从 bash 脚本运行 makefile 命令并返回错误结果代码

    我有一个 python makefile 我可以从我的 bash 脚本运行它的命令 如下所示 local make lint output make lint output make test unit 2 gt 1 echo make l
  • 将 PHP 变量从一个页面传递到另一个页面,再传递到另一个页面 [重复]

    这个问题在这里已经有答案了 可能的重复 在 PHP 中将值从页面传递到另一个页面 例如 我有3页 第一页将是一个常规的 HTML 登录页面 将要求输入用户名 第二页将通过 POST 函数获取此用户名变量并分配一个 php 变量 例如 use
  • Worklight http 适配器问题

    2个简单的问题 通过 http 适配器发出的所有 http 请求是否都会首先通过 worklight 服务器 如果是这样 那么是否意味着即使是对公共网站的 http 适配器请求 例如对 yahoo 网站的股票价格请求 也会首先通过 work
  • 自定义 XYJfree 图表中的条形颜色

    如何用不同的颜色绘制不同的条形 我尝试使用渲染器 这是我的示例代码 public IntervalXYDataset createDataset throws InterruptedException parseFile final XYS
  • .NET Standard 2.0 使用的兼容性填充程序

    概述 example NET Standard 2 0 表示它现在使用某种兼容性填充程序来修复第三方库兼容性问题 因此 您可以将第三方库与 NET Standard 一起使用 直到它不使用 NET Standard 没有的任何 API 不清
  • 如果 iCloud 设置为不同步提醒,则无法创建本地 EKCalendar(提醒)

    这里遇到了一个非常奇怪的问题 在我看来这是 EventKit API 的问题 我只是想检查一下我没有做什么 测试用例1 在应用程序的隐私中启用提醒 该设备有 iCloud 帐户 但设置为不同步提醒 我可以创建一个localApple 的 提
  • 从列表集合中删除重复项

    希望可以有人帮帮我 我正在使用 c 并且对它有点陌生 我正在将一个文本文件加载到我的应用程序中 并将数据拆分为 我正在将字符串的一部分读入
  • Android 设备上短信的默认字符集/编码是什么?

    如果有必要保持简单的话 我主要关心北美的英语手机 具体来说 当发送 接收短信和彩信时 字符是如何编码的 有区别吗 我的初步研究表明UTF 8是默认值 但我也看到了对US ASCII对于美国设备和其他区域设置的其他字符集 Quote 平台默认
  • 如何重新启动 TimerTask

    我编写了一个任务来通过套接字发送特定的 TCP 消息 我有一个包含一堆消息和一些时间戳的文件 因此我将该任务编程为 TimerTask 并使用具有第一个消息时间戳的计时器对其进行调度 当它完成时 任务运行方法结束 但其关联的线程仍然存在 并
  • AFNetworking 2.0下载多张图片完成

    我正在尝试找出一种使用 AFNewtorking 2 0 下载多个图像的方法 我在这里读了很多帖子 但找不到我正在寻找的答案 希望你们能帮助我 问题是我想知道所有下载何时完成以及所有图像是否已下载 所以我有一个带有图像 URL 的数组 蚂蚁
  • SQL 合并时出现 ORA-38104 错误的原因是什么?

    我有这样的代码 MERGE INTO target table tgt USING source table src on tgt c1 src c1 WHEN MATCHED THEN UPDATE SET tgt c1 src c2 I
  • 根据跨越边界的数量,用颜色突出显示超过或低于阈值的 matplotlib 点

    我有一个如下所示的图表 我为获取该图 8 个图的序列之一 而运行的代码如下 date list list df testing set date unique random date list list np random choice d
  • JSON4s 找不到带 Spark 的构造函数

    我在尝试在 Spark 作业中解析 json 时遇到了问题 我使用的是 Spark 1 1 0 json4s 和 Cassandra Spark 连接器以及 DSE 4 6 抛出的异常是 org json4s package Mapping
  • 适当的CSS以确保body元素填满整个屏幕

    我的身体元素有问题 似乎 100 占满了屏幕 但是 如果您将浏览器拖动得较小 然后向下滚动 则主体不会扩展 请参见这个jsFiddle作为一个很好的例子 height 100 是您网站显示的窗口的高度 而不是网站的高度 这会导致向下滚动时背
  • TPL数据流处理N条最新消息

    我正在尝试创建某种队列来处理收到的 N 个最新消息 现在我有这个 private static void SetupMessaging messagingBroadcastBlock new BroadcastBlock