使用 Batchblock.Triggerbatch() 在 TPL 数据流管道中进行数据传播

2024-01-21

在我的生产者-消费者场景中,我有多个消费者,每个消费者都向外部硬件发送一个操作,这可能需要一些时间。我的管道看起来有点像这样:

BatchBlock --> TransformBlock --> BufferBlock --> (几个) ActionBlocks

我已将 ActionBlocks 的 BoundedCapacity 指定为 1。 理论上我想要的是,仅当我的一个操作块可用于操作时,我才想触发 Batchblock 将一组项目发送到 Transformblock。直到那时,Batchblock 应该只保留缓冲元素,而不是将它们传递到 Transformblock。我的批量大小是可变的。由于 Batchsize 是强制性的,我对 BatchBlock 批量大小确实有一个非常高的上限,但是我真的不希望达到该限制,我想根据执行上述任务的 Actionblocks 的可用性来触发我的批次。

我在 Triggerbatch() 方法的帮助下实现了这一点。我将 Batchblock.Triggerbatch() 称为 ActionBlock 中的最后一个操作。然而有趣的是,经过几天的正常工作后,管道出现了故障。经过检查,我发现有时批处理块的输入是在 ActionBlock 完成工作后才输入的。在这种情况下,ActionBlock 在其工作结束时实际上会调用 Triggerbatch,但是由于此时根本没有输入到 Batchblock,因此对 TriggerBatch 的调用是徒劳的。一段时间后,当输入流入 Batch 块时,就没有人可以调用 TriggerBatch 并重新启动 Pipeline。我一直在寻找可以检查 Batchblock 的输入缓冲区中是否确实存在某些内容的东西,但是没有这样的功能可用,我也找不到一种方法来检查 TriggerBatch 是否有效。

谁能建议一个可能的解决方案来解决我的问题。不幸的是,使用计时器来触发批次对我来说不是一个选择。除了 Pipeline 的启动之外,节流应该仅由 ActionBlock 之一的可用性来控制。

示例代码在这里:

    static BatchBlock<int> _groupReadTags;

    static void Main(string[] args)
    {
        _groupReadTags = new BatchBlock<int>(1000);

        var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
        BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
        int batchNo = 1;


        TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
        {

            Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
            //_groupReadTags.TriggerBatch();
            int sum = 0;

            foreach (int item in list)
            {
                Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
                sum += item;

            }
            batchNo++;
            return sum;

        });

            ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
            {
                Console.WriteLine("Number from ONE :{0}",x);
                await Task.Delay(500);

                    Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);

                _groupReadTags.TriggerBatch();



        },consumerOptions);

        ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
        {
            Console.WriteLine("Number from TWO :{0}", x);
            await Task.Delay(2000);
            _groupReadTags.TriggerBatch();

        }, consumerOptions);

        _groupReadTags.LinkTo(_workingBlock);
        _workingBlock.LinkTo(_frameBuffer);
        _frameBuffer.LinkTo(_worker1);
        _frameBuffer.LinkTo(_worker2);

        _groupReadTags.Post(10);
        _groupReadTags.Post(20);
        _groupReadTags.TriggerBatch();

        Task postingTask = new Task(() => PostStuff());
        postingTask.Start();
        Console.ReadLine();

    }



    static void PostStuff()
    {


        for (int i = 0; i < 10; i++)
            {
                _groupReadTags.Post(i);
                Thread.Sleep(100);
            }

        Parallel.Invoke(
            () => _groupReadTags.Post(100),
            () => _groupReadTags.Post(200),
            () => _groupReadTags.Post(300),
            () => _groupReadTags.Post(400),
            () => _groupReadTags.Post(500),
            () => _groupReadTags.Post(600),
            () => _groupReadTags.Post(700),
            () => _groupReadTags.Post(800)
                       );
    }

这是一个替代方案BatchBlock https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.batchblock-1具有一些额外功能的实现。它包括一个TriggerBatch具有此签名的方法:

public int TriggerBatch(int nextMinBatchSizeIfEmpty);

如果输入队列不为空,调用此方法将立即触发批处理,否则将设置临时MinBatchSize这只会影响下一批。您可以使用较小的值来调用此方法nextMinBatchSizeIfEmpty确保在当前无法生产批次的情况下,下一个批次将比配置的时间更早发生BatchSize在块的构造函数中。

此方法返回生产批次的大小。它返回0如果输入队列为空,或者输出队列已满,或者块已完成。

public class BatchBlockEx<T> : ITargetBlock<T>, ISourceBlock<T[]>
{
    private readonly ITargetBlock<T> _input;
    private readonly IPropagatorBlock<T[], T[]> _output;
    private readonly Queue<T> _queue;
    private readonly object _locker = new object();
    private int _nextMinBatchSize = Int32.MaxValue;

    public Task Completion { get; }
    public int InputCount { get { lock (_locker) return _queue.Count; } }
    public int OutputCount => ((BufferBlock<T[]>)_output).Count;
    public int BatchSize { get; }

    public BatchBlockEx(int batchSize, DataflowBlockOptions dataflowBlockOptions = null)
    {
        if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
        dataflowBlockOptions = dataflowBlockOptions ?? new DataflowBlockOptions();
        if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded &&
            dataflowBlockOptions.BoundedCapacity < batchSize)
            throw new ArgumentOutOfRangeException(nameof(batchSize),
            "Number must be no greater than the value specified in BoundedCapacity.");

        this.BatchSize = batchSize;

        _output = new BufferBlock<T[]>(dataflowBlockOptions);

        _queue = new Queue<T>(batchSize);

        _input = new ActionBlock<T>(async item =>
        {
            T[] batch = null;
            lock (_locker)
            {
                _queue.Enqueue(item);
                if (_queue.Count == batchSize || _queue.Count >= _nextMinBatchSize)
                {
                    batch = _queue.ToArray(); _queue.Clear();
                    _nextMinBatchSize = Int32.MaxValue;
                }
            }
            if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);

        }, new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 1,
            CancellationToken = dataflowBlockOptions.CancellationToken
        });

        var inputContinuation = _input.Completion.ContinueWith(async t =>
        {
            try
            {
                T[] batch = null;
                lock (_locker)
                {
                    if (_queue.Count > 0)
                    {
                        batch = _queue.ToArray(); _queue.Clear();
                    }
                }
                if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
            }
            finally
            {
                if (t.IsFaulted)
                {
                    _output.Fault(t.Exception.InnerException);
                }
                else
                {
                    _output.Complete();
                }
            }
        }, TaskScheduler.Default).Unwrap();

        this.Completion = Task.WhenAll(inputContinuation, _output.Completion);
    }

    public void Complete() => _input.Complete();
    void IDataflowBlock.Fault(Exception ex) => _input.Fault(ex);

    public int TriggerBatch(Func<T[], bool> condition, int nextMinBatchSizeIfEmpty)
    {
        if (nextMinBatchSizeIfEmpty < 1)
            throw new ArgumentOutOfRangeException(nameof(nextMinBatchSizeIfEmpty));
        int count = 0;
        lock (_locker)
        {
            if (_queue.Count > 0)
            {
                T[] batch = _queue.ToArray();
                if (condition == null || condition(batch))
                {
                    bool accepted = _output.Post(batch);
                    if (accepted) { _queue.Clear(); count = batch.Length; }
                }
                _nextMinBatchSize = Int32.MaxValue;
            }
            else
            {
                _nextMinBatchSize = nextMinBatchSizeIfEmpty;
            }
        }
        return count;
    }

    public int TriggerBatch(Func<T[], bool> condition)
        => TriggerBatch(condition, Int32.MaxValue);

    public int TriggerBatch(int nextMinBatchSizeIfEmpty)
        => TriggerBatch(null, nextMinBatchSizeIfEmpty);

    public int TriggerBatch() => TriggerBatch(null, Int32.MaxValue);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
    {
        return _input.OfferMessage(messageHeader, messageValue, source,
            consumeToAccept);
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
    {
        return _output.ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
    {
        return _output.ReserveMessage(messageHeader, target);
    }

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
    {
        _output.ReleaseReservation(messageHeader, target);
    }

    IDisposable ISourceBlock<T[]>.LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
    {
        return _output.LinkTo(target, linkOptions);
    }
}

另一个超载TriggerBatch方法允许检查当前可以生产的批次,并决定是否应该触发它:

public int TriggerBatch(Func<T[], bool> condition);

The BatchBlockEx类不支持Greedy https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.groupingdataflowblockoptions.greedy and MaxNumberOfGroups https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.groupingdataflowblockoptions.maxnumberofgroups内置选项BatchBlock.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Batchblock.Triggerbatch() 在 TPL 数据流管道中进行数据传播 的相关文章

  • 编译时运算符

    有人可以列出 C 中可用的所有编译时运算符吗 C 中有两个运算符 无论操作数如何 它们的结果始终可以在编译时确定 它们是sizeof 1 and 2 当然 其他运算符的许多特殊用途可以在编译时解决 例如标准中列出的那些整数常量表达式 1 与
  • 如何使用 C# 中的参数将用户重定向到 paypal

    如果我有像下面这样的简单表格 我可以用它来将用户重定向到 PayPal 以完成付款
  • 为什么当实例化新的游戏对象时,它没有向它们添加标签? [复制]

    这个问题在这里已经有答案了 using System Collections using System Collections Generic using UnityEngine public class Test MonoBehaviou
  • 在 Windows 窗体中保存带有 Alpha 通道的单色位图会保存不同(错误)的颜色

    在 C NET 2 0 Windows 窗体 Visual Studio Express 2010 中 我保存由相同颜色组成的图像 Bitmap bitmap new Bitmap width height PixelFormat Form
  • 将 VSIX 功能添加到 C# 类库

    我有一个现有的单文件生成器 位于 C 类库中 如何将 VSIX 项目级功能添加到此项目 最终目标是编译我的类库项目并获得 VSIX 我实际上是在回答我自己的问题 这与Visual Studio 2017 中的单文件生成器更改 https s
  • C# 中通过 Process.Kill() 终止的进程的退出代码

    如果在我的 C 应用程序中 我正在创建一个可以正常终止或开始行为异常的子进程 在这种情况下 我通过调用 Process Kill 来终止它 但是 我想知道该进程是否已退出通常情况下 我知道我可以获得终止进程的错误代码 但是正常的退出代码是什
  • 创建链表而不将节点声明为指针

    我已经在谷歌和一些教科书上搜索了很长一段时间 我似乎无法理解为什么在构建链表时 节点需要是指针 例如 如果我有一个节点定义为 typedef struct Node int value struct Node next Node 为什么为了
  • 将多个表映射到实体框架中的单个实体类

    我正在开发一个旧数据库 该数据库有 2 个具有 1 1 关系的表 目前 我为每个定义的表定义了一种类型 1Test 1Result 我想将这些特定的表合并到一个类中 当前的类型如下所示 public class Result public
  • WCF 中 SOAP 消息的数字签名

    我在 4 0 中有一个 WCF 服务 我需要向 SOAP 响应添加数字签名 我不太确定实际上应该如何完成 我相信响应应该类似于下面的链接中显示的内容 https spaces internet2 edu display ISWG Signe
  • 如何设计以 char* 指针作为类成员变量的类?

    首先我想介绍一下我的情况 我写了一些类 将 char 指针作为私有类成员 而且这个项目有 GUI 所以当单击按钮时 某些函数可能会执行多次 这些类是设计的单班在项目中 但是其中的某些函数可以执行多次 然后我发现我的项目存在内存泄漏 所以我想
  • SolrNet连接说明

    为什么 SolrNet 连接的容器保持静态 这是一个非常大的错误 因为当我们在应用程序中向应用程序发送异步请求时 SolrNet 会表现异常 在 SolrNet 中如何避免这个问题 class P static void M string
  • 转发声明和包含

    在使用库时 无论是我自己的还是外部的 都有很多带有前向声明的类 根据情况 相同的类也包含在内 当我使用某个类时 我需要知道该类使用的某些对象是前向声明的还是 include d 原因是我想知道是否应该包含两个标题还是只包含一个标题 现在我知
  • 使用 x509 证书签署 json 文档或字符串

    如何使用 x509 证书签署 json 文档或字符串 public static void fund string filePath C Users VIKAS Desktop Data xml Read the file XmlDocum
  • 如何使用 C# / .Net 将文件列表从 AWS S3 下载到我的设备?

    我希望下载存储在 S3 中的多个图像 但目前如果我只能下载一个就足够了 我有对象路径的信息 当我运行以下代码时 出现此错误 遇到错误 消息 读取对象时 访问被拒绝 我首先做一个亚马逊S3客户端基于我的密钥和访问配置的对象连接到服务器 然后创
  • 对现有视频添加水印

    我正在寻找一种用 C 在视频上加水印的方法 就像在上面写文字一样 图片或文字标签 我该怎么做 谢谢 您可以使用 Nreco 视频转换器 代码看起来像 NReco VideoConverter FFMpegConverter wrap new
  • 通过指向其基址的指针删除 POD 对象是否安全?

    事实上 我正在考虑那些微不足道的可破坏物体 而不仅仅是POD http en wikipedia org wiki Plain old data structure 我不确定 POD 是否可以有基类 当我读到这个解释时is triviall
  • 测试用例执行完成后,无论是否通过,如何将测试用例结果保存在变量中?

    我正在使用 NUNIT 在 Visual Studio 中使用 Selenium WebDriver 测试用例的代码是 我想在执行测试用例后立即在变量中记录测试用例通过或失败的情况 我怎样才能实现这一点 NUnit 假设您使用 NUnit
  • 是否可以在 .NET Core 中将 gRPC 与 HTTP/1.1 结合使用?

    我有两个网络服务 gRPC 客户端和 gRPC 服务器 服务器是用 NET Core编写的 然而 客户端是托管在 IIS 8 5 上的 NET Framework 4 7 2 Web 应用程序 所以它只支持HTTP 1 1 https le
  • 如何在文本框中插入图像

    有没有办法在文本框中插入图像 我正在开发一个聊天应用程序 我想用图标图像更改值 等 但我找不到如何在文本框中插入图像 Thanks 如果您使用 RichTextBox 进行聊天 请查看Paste http msdn microsoft co
  • 如何防止用户控件表单在 C# 中处理键盘输入(箭头键)

    我的用户控件包含其他可以选择的控件 我想实现使用箭头键导航子控件的方法 问题是家长控制拦截箭头键并使用它来滚动其视图什么是我想避免的事情 我想自己解决控制内容的导航问题 我如何控制由箭头键引起的标准行为 提前致谢 MTH 这通常是通过重写

随机推荐