下游块中的 TPL 数据流和异常处理

2024-01-10

我有以下伪代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>(
    async item =>
    {
        var processingTasks = targets.Select(async t =>
        {
            try
            {
                // This is condition is always false
                // t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
                if (!await t.SendAsync(item))
                    await t.Completion;
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Handled exception : " + e.Message);
            }
        });

        try
        {
            // Neither here the exception is rethrowed
            await Task.WhenAll(processingTasks);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
        }
    });

for (var i = 1; i <= 10; i++)
{
    broadcaster.Post(i);
}

管道是这样配置的ActionBlock<int> => BufferBlock<int> => ActionBlock<int>.

最后ActionBlock<int>抛出异常,但它不会重新抛出到我想要处理它的源代码块。

如何重写此代码以便正确处理异常?


您可以找到该主题的官方指南here https://blogs.msdn.microsoft.com/pfxteam/2011/11/09/exception-handling-in-tpl-dataflow-networks/。总体解决方案是订阅所有区块Completion https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.idataflowblock.completion.aspx任务包括检查其状态,并在需要时替换有故障的块(也应该存储块的所有引用)。请参阅整篇文章以获取更多信息。

网络的行为Faulted blocks

  1. 保留消息
    为了避免消息损坏,故障块应该清除其消息队列并移动到Faulted状态 尽快地。有一个场景不符合 这条规则:源块保存目标保留的消息。 如果遇到内部异常的块有一条消息: 被目标保留,保留消息必须不 掉落,并且该块不应移动到Faulted状态 直到消息被释放或消费。

  2. 悬挂网络
    ...

    • 保留对网络中所有块的引用并使用Task.WaitAll https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.waitall.aspx or Task.WhenAll https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenall.aspx等待他们(同步或 异步)。如果一个块出现故障,它的Completion https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.idataflowblock.completion.aspx任务将 完成于Faulted state.
    • Use DataflowLinkOptions https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.dataflowlinkoptions.aspx with PropagateCompletion == true构建线性网络时。这将从中传播块完成 源到目标。在这种情况下,等待网络就足够了 叶块。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

下游块中的 TPL 数据流和异常处理 的相关文章

  • 编译时运算符

    有人可以列出 C 中可用的所有编译时运算符吗 C 中有两个运算符 无论操作数如何 它们的结果始终可以在编译时确定 它们是sizeof 1 and 2 当然 其他运算符的许多特殊用途可以在编译时解决 例如标准中列出的那些整数常量表达式 1 与
  • 如何使用 C# 中的参数将用户重定向到 paypal

    如果我有像下面这样的简单表格 我可以用它来将用户重定向到 PayPal 以完成付款
  • WCF RIA 服务 - 加载多个实体

    我正在寻找一种模式来解决以下问题 我认为这很常见 我正在使用 WCF RIA 服务在初始加载时将多个实体返回给客户端 我希望两个实体异步加载 以免锁定 UI 并且我想利用 RIA 服务来执行此操作 我的解决方案如下 似乎有效 这种方法会遇到
  • 不支持将数据直接绑定到存储查询(DbSet、DbQuery、DbSqlQuery)

    正在编码视觉工作室2012并使用实体模型作为我的数据层 但是 当页面尝试加载时 上面提到的标题 我使用 Linq 语句的下拉控件往往会引发未处理的异常 下面是我的代码 using AdventureWorksEntities dw new
  • 用于检查类是否具有运算符/成员的 C++ 类型特征[重复]

    这个问题在这里已经有答案了 可能的重复 是否可以编写一个 C 模板来检查函数是否存在 https stackoverflow com questions 257288 is it possible to write a c template
  • 查找c中结构元素的偏移量

    struct a struct b int i float j x struct c int k float l y z 谁能解释一下如何找到偏移量int k这样我们就可以找到地址int i Use offsetof 找到从开始处的偏移量z
  • Asp.NET WebApi 中类似文件名称的路由

    是否可以在 ASP NET Web API 路由配置中添加一条路由 以允许处理看起来有点像文件名的 URL 我尝试添加以下条目WebApiConfig Register 但这不起作用 使用 URIapi foo 0de7ebfa 3a55
  • 为什么当实例化新的游戏对象时,它没有向它们添加标签? [复制]

    这个问题在这里已经有答案了 using System Collections using System Collections Generic using UnityEngine public class Test MonoBehaviou
  • C#中如何移动PictureBox?

    我已经使用此代码来移动图片框pictureBox MouseMove event pictureBox Location new System Drawing Point e Location 但是当我尝试执行时 图片框闪烁并且无法识别确切
  • C++ OpenSSL 导出私钥

    到目前为止 我成功地使用了 SSL 但遇到了令人困惑的障碍 我生成了 RSA 密钥对 之前使用 PEM write bio RSAPrivateKey 来导出它们 然而 手册页声称该格式已经过时 实际上它看起来与通常的 PEM 格式不同 相
  • 创建链表而不将节点声明为指针

    我已经在谷歌和一些教科书上搜索了很长一段时间 我似乎无法理解为什么在构建链表时 节点需要是指针 例如 如果我有一个节点定义为 typedef struct Node int value struct Node next Node 为什么为了
  • 将多个表映射到实体框架中的单个实体类

    我正在开发一个旧数据库 该数据库有 2 个具有 1 1 关系的表 目前 我为每个定义的表定义了一种类型 1Test 1Result 我想将这些特定的表合并到一个类中 当前的类型如下所示 public class Result public
  • while 循环中的 scanf

    在这段代码中 scanf只工作一次 我究竟做错了什么 include
  • 如何在整个 ASP .NET MVC 应用程序中需要授权

    我创建的应用程序中 除了启用登录的操作之外的每个操作都应该超出未登录用户的限制 我应该添加 Authorize 每个班级标题前的注释 像这儿 namespace WebApplication2 Controllers Authorize p
  • 使用 x509 证书签署 json 文档或字符串

    如何使用 x509 证书签署 json 文档或字符串 public static void fund string filePath C Users VIKAS Desktop Data xml Read the file XmlDocum
  • 覆盖子类中的字段或属性

    我有一个抽象基类 我想声明一个字段或属性 该字段或属性在从该父类继承的每个类中具有不同的值 我想在基类中定义它 以便我可以在基类方法中引用它 例如覆盖 ToString 来表示 此对象的类型为 property field 我有三种方法可以
  • 链接器错误:已定义

    我尝试在 Microsoft Visual Studio 2012 中编译我的 Visual C 项目 使用 MFC 但出现以下错误 error LNK2005 void cdecl operator new unsigned int 2
  • 如何将带有 IP 地址的连接字符串放入 web.config 文件中?

    我们当前在 web config 文件中使用以下连接字符串 add name DBConnectionString connectionString Data Source ourServer Initial Catalog ourDB P
  • C# - OutOfMemoryException 在 JSON 文件上保存列表

    我正在尝试保存压力图的流数据 基本上我有一个压力矩阵定义为 double pressureMatrix new double e Data GetLength 0 e Data GetLength 1 基本上 我得到了其中之一pressur
  • 对来自流读取器的过滤数据执行小计

    编辑问题未得到解答 我有一个基于 1 个标准的过滤输出 前 3 个数字是 110 210 或 310 给出 3 个不同的组 从流阅读器控制台 问题已编辑 因为第一个答案是我给出的具体示例的字面解决方案 我使用的实际字符串长度为 450 个

随机推荐

  • 无法在 iOS 上使用自定义 @protocol

    注意 以下是使用启用了自动引用计数 ARC 的 iOS 我认为 ARC 可能与它不起作用有很大关系 因为这是根据我通过谷歌找到的示例设置的 我正在尝试创建一个协议来通知委托用户从 UITableView 选择的文件名 文件列表视图控制器 h
  • 离子搜索栏搜索不适用于 cypress {enter}

    我有一个 Ionic 6 应用程序 我正在使用 cypress 9 3 1 对其进行测试 在我的应用程序中 我使用像这样的离子搜索栏
  • 插入...值(SELECT ... FROM ...)

    我在尝试着INSERT INTO使用另一个表的输入的表 尽管这对于许多数据库引擎来说是完全可行的 但我似乎总是很难记住正确的语法SQL当天的发动机 MySQL http en wikipedia org wiki MySQL Oracle
  • WPF DPI 问题

    我开发了一个应用程序 在我的计算机上看起来很棒 但当我将其安装到具有不同分辨率和 DPI 设置的其他计算机上时 它看起来很糟糕 控件相互重叠 这真是太痛苦了 有人对如何避免这种情况有什么建议吗 Windows 无法知道屏幕的本机 DPI 每
  • Python3 - 无法读取 docx、odt 文件 - UnicodeDecodeError:“utf-8”编解码器无法解码位置 10 中的字节 0xea:无效的连续字节

    我正在尝试将大 docx 文件拆分为小文件 为此 当读取文件时python3 6使用以下代码 with open h docx r as f a f read 它抛出这个错误 Traceback most recent call last
  • Linux 中的沙箱

    我想创建一个 Web 应用程序 允许用户上传一些 C 代码 并查看其执行结果 代码将在服务器上编译 用户不受信任 这显然会带来巨大的安全隐患 所以我需要为应用程序创建某种沙箱 在最基本的层面上 我想将对文件系统的访问限制为某些指定的目录 我
  • Spring JPA 中的 @Entity 是什么?

    具体来说 我指的是javax persistence Entity 根据我将鼠标悬停在上面时显示的文档 在 VS Code 中它指出 指定该类是一个实体 该注解适用于 实体类 对于 Spring JPA 来说 类是实体意味着什么 Entit
  • GetComInterfaceForObject 是否固定对象?

    使用 GetComInterfaceForObject 并将返回的 IntPtr 传递给非托管代码是否会阻止托管对象在内存中移动 或者 clr 是否以某种方式维护该 ptr 请注意 非托管代码将在程序的生命周期内使用它 并且我需要确保托管对
  • 在单个测试类中测试接口的多个实现

    我需要通过班级级别的测试数据但是Theory and InlineData属性只能用在方法上 public class ContainerTests TestFixture private IContainer container publ
  • 在 GPU 上预加载整个数据集以训练 Keras 模型

    我有一个特定的情况 其中网络相对较小 为了收敛和泛化问题 我应该保持较小的批量大小 例如 256 这导致每个时期要处理数百个批量 不幸的是 在这种情况下 批量 加载和损失计算成为瓶颈 如timeline工具告诉我 在 TensorFlow
  • spring可以支持多应用共享成员资格吗?

    spring 框架是否支持共享公共用户群的多个应用程序 例如2 个独立的 Web 应用程序以某种方式连接到单个数据库以获取用户相关信息 用户名 密码 甚至角色 这个想法是这样的 类似于ASP NET 会员资格 http msdn micro
  • 图像隐写术抵御各种攻击的最佳实践是什么? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我对此真的很好奇 因为现在每个频道都可以以某种方式修改或压缩图像 这可以被视为对隐写术的攻击 我们可以将隐写术分为两种基本类型 第一种在图像的空
  • ClassNotFound launchig 地图活动在 Android 库中声明

    当尝试启动从 MapActivity 派生并在 Android 库项目中声明的活动 TestLocationActivity 时 我收到此异常 09 08 09 29 45 357 ERROR AndroidRuntime 7502 jav
  • 是否可以更改Matlab绘图功能中的标记? [复制]

    这个问题在这里已经有答案了 我正在尝试使用 matlab 绘图函数来创建绘图 然而 可用的标记是有限的 例如 plot x y o 将用圆形标记绘制 但是 如果我想要带有箭头符号或字母的标记 这是不可能的 有谁知道有什么方法可以做到这一点
  • Excel VBA 调试:循环不搜索整个范围

    我编写了一个基本宏来搜索范围 在一张纸中 然后根据保存选择值的第三张纸复制所选单元格 到另一张纸 我已经使用了 i x to y 的循环 但看起来宏正在跳过一些行 即 如果第 1 行到第 4 行有 4 个要复制的有效值 则宏仅复制第 2 行
  • 如何拥有连续的 Firebase 云函数来获取连续的数据流?

    我需要使用 Twitter Stream API 将推文数据流式传输到我的 firebase 云函数 如下所示 client stream statuses filter params stream gt stream on data tw
  • 为什么 .NET JIT 编译器决定不内联或优化对没有副作用的空静态方法的调用?

    我认为我观察到 NET JIT 编译器没有内联或优化对没有副作用的空静态方法的调用 考虑到一些定制的在线资源 这有点令人惊讶 我的环境是 x64 Windows 8 1 NET Framework 4 5 上的 Visual Studio
  • 从posenet提取关键点到json文件?

    我正在研究posenet的张量流实现来实时进行姿态估计 如果可能的话还可以在离线模式下进行 我正在研究以下存储库 https github com tensorflow tfjs models tree master posenet htt
  • 如何在 Maven 中调试工件替换

    我有一个父项目包含十几个子项目 其中一个子项目使用org apache httpcomponents httpclient jar 4 3 5 这取决于org apache httpcomponents httpcore jar 4 3 2
  • 下游块中的 TPL 数据流和异常处理

    我有以下伪代码 var queue new BufferBlock