TPL DataFlow处理异常的正确方法

2023-12-31

我在使用 TPL DataFlow 管理队列(数据库)并将工作重定向到网格计算服务的 Windows 服务中遇到问题。在某一时刻 BufferBlock 停止释放任务,我不知道为什么。我认为这是因为在执行某些任务期间发生了一些异常,但它们被抑制了,很难理解 BufferBlock 在什么时候停止接受新任务。

我试图在下面的工作示例中简化它。 它没有任何异常处理,我想知道如何正确处理 TPL 中的异常。 我在这里找到了类似的东西TPL数据流,仅当所有源数据块完成时才保证完成 https://stackoverflow.com/questions/13510094/tpl-dataflow-guarantee-completion-only-when-all-source-data-blocks-completed。 在这个例子中我有100个请求,并用10个请求批量处理数据。 模拟 ID % 9 == 0 时发生的一些异常 如果我没有捕获此异常,它会工作一点,然后停止接受新请求。 如果我处理并返回 Result.Failure 我相信它工作得很好,但我不确定这是否是在生产环境中使用它的正确方法。

我是 TPL 的新手,如果我没有更清楚地解释我的问题,请忘记我。GitHub 项目 https://github.com/dmitriydas/TestTPL

图像空槽 https://i.stack.imgur.com/6FEcK.png

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Timers;
using CSharpFunctionalExtensions;

namespace TestTPL
{
    public class ServicePipeline
    {
        public const int batches = 100;
        private int currentBatch = 0;

        public ServicePipeline(int maxRequestsInParallel)
        {
            MaxRequestsInParallel = maxRequestsInParallel;
        }

        public int MaxRequestsInParallel { get; }
        public BufferBlock<MyData> QueueBlock { get; private set; }
        public List<TransformBlock<MyData, Result>> ExecutionBlocks
            { get; private set; }
        public ActionBlock<Result> ResultBlock { get; private set; }

        private void Init()
        {
            QueueBlock = new BufferBlock<MyData>(new DataflowBlockOptions()
                { BoundedCapacity = MaxRequestsInParallel });
            ExecutionBlocks = new List<TransformBlock<MyData, Result>>();
            ResultBlock = new ActionBlock<Result>(_ => _.OnFailure(
                () => Console.WriteLine($"Error: {_.Error}")));

            for (int blockIndex = 0; blockIndex < MaxRequestsInParallel;
                blockIndex++)
            {
                var executionBlock = new TransformBlock<MyData, Result>((d) =>
                {
                    return ExecuteAsync(d);
                }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
                executionBlock.LinkTo(ResultBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                ExecutionBlocks.Add(executionBlock);
            }
        }

        public static Result ExecuteAsync(MyData myData)
        {
            //try
            //{
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => web.DownloadStringAsync(
                new Uri("http://localhost:49182/Slow.ashx")));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
            //}
            //catch (Exception ex)
            //{
            //    return Result.Failure($"Exception: {ex.Message}");
            //}
        }

        public async void Start()
        {
            Init();
            while (currentBatch < batches)
            {
                Thread.Sleep(1000);
                await SubmitNextRequests();
            }
            Console.WriteLine($"Completed: {batches}");
        }

        private async Task<int> SubmitNextRequests()
        {
            var emptySlots = MaxRequestsInParallel - QueueBlock.Count;
            Console.WriteLine($"Empty slots: {emptySlots}" +
                $", left = {batches - currentBatch}");
            if (emptySlots > 0)
            {
                var dataRequests = await GetNextRequests(emptySlots);
                foreach (var data in dataRequests)
                {
                    await QueueBlock.SendAsync(data);
                }
            }
            return emptySlots;
        }

        private async Task<List<MyData>> GetNextRequests(int request)
        {
            MyData[] myDatas = new MyData[request];
            Task<List<MyData>> task = Task<List<MyData>>.Run(() =>
            {
                for (int i = 0; i < request; i++)
                {
                    myDatas[i++] = new MyData(currentBatch);
                    currentBatch++;
                }
                return new List<MyData>(myDatas);
            });
            return await task;
        }
    }

    public class MyData
    {
        public int Id { get; set; }
        public MyData(int id) => Id = id;
        public override string ToString() { return Id.ToString(); }
    }
}

编辑:2019 年 10 月 30 日当异常被显式处理和调用时,它会按预期工作Result.Failure($"异常:{ex.Message}");

    public static Result ExecuteAsync(MyData myData)
    {
        try
        {
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => Thread.Sleep(2000));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
        }
        catch (Exception ex)
        {
            return Result.Failure($"Exception: {ex.Message}");
        }
    }

当链接两​​个块时,可以选择向前传播完成,但不能向后传播。当BoundedCapacity使用选项,会发生错误,因为它会阻塞管道的馈线并导致死锁。不过,手动传播完成是很容易的。这是您可以使用的方法。

async void OnErrorComplete(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted) block2.Complete();
}

它异步等待block1完成,如果失败则立即完成block2。完成上游块通常就足够了,但如果需要,您也可以传播特定的异常:

async void OnErrorPropagate(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted)
        block2.Fault(block1.Completion.Exception.InnerException);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TPL DataFlow处理异常的正确方法 的相关文章

  • 通过引用传递 [C++]、[Qt]

    我写了这样的东西 class Storage public Storage QString key const int value const void add item QString int private QMap
  • C++11 删除重写方法

    Preface 这是一个关于最佳实践的问题 涉及 C 11 中引入的删除运算符的新含义 当应用于覆盖继承父类的虚拟方法的子类时 背景 根据标准 引用的第一个用例是明确禁止调用某些类型的函数 否则转换将是隐式的 例如最新版本第 8 4 3 节
  • 如何从 Visual Studio 将视图导航到其控制器?

    问题是解决方案资源管理器上有 29 个项目 而且项目同时具有 ASP NET MVC 和 ASP NET Web 表单结构 在MVC部分中 Controller文件夹中有大约100个子文件夹 每个文件夹至少有3 4个控制器 视图完全位于不同
  • 为什么 GCC 不允许我创建“内联静态 std::stringstream”?

    我将直接前往 MCVE include
  • 如何从本机 C(++) DLL 调用 .NET (C#) 代码?

    我有一个 C app exe 和一个 C my dll my dll NET 项目链接到本机 C DLL mynat dll 外部 C DLL 接口 并且从 C 调用 C DLL 可以正常工作 通过使用 DllImport mynat dl
  • 从经典 ASP 调用 .Net C# DLL 方法

    我正在开发一个经典的 asp 项目 该项目需要将字符串发送到 DLL DLL 会将其序列化并发送到 Zebra 热敏打印机 我已经构建了我的 DLL 并使用它注册了regasm其次是 代码库这使得 IIS 能够识别它 虽然我可以设置我的对象
  • WPF 数据绑定到复合类模式?

    我是第一次尝试 WPF 并且正在努力解决如何将控件绑定到使用其他对象的组合构建的类 例如 如果我有一个由两个单独的类组成的类 Comp 为了清楚起见 请注意省略的各种元素 class One int first int second cla
  • 重载 (c)begin/(c)end

    我试图超载 c begin c end类的函数 以便能够调用 C 11 基于范围的 for 循环 它在大多数情况下都有效 但我无法理解和解决其中一个问题 for auto const point fProjectData gt getPoi
  • ASP.NET Core 3.1登录后如何获取用户信息

    我试图在登录 ASP NET Core 3 1 后获取用户信息 如姓名 电子邮件 id 等信息 这是我在登录操作中的代码 var claims new List
  • C# xml序列化必填字段

    我需要将一些字段标记为需要写入 XML 文件 但没有成功 我有一个包含约 30 个属性的配置类 这就是为什么我不能像这样封装所有属性 public string SomeProp get return someProp set if som
  • C 编程:带有数组的函数

    我正在尝试编写一个函数 该函数查找行为 4 列为 4 的二维数组中的最大值 其中二维数组填充有用户输入 我知道我的主要错误是函数中的数组 但我不确定它是什么 如果有人能够找到我出错的地方而不是编写新代码 我将不胜感激 除非我刚去南方 我的尝
  • 引用的程序集自动由 Visual Studio 替换

    我有 2 个项目 一个可移植类库和一个常规单元测试项目 在可移植类库中 我使用 NuGet 来引用 Microsoft BCL 可移植包 它附带 2 个程序集 System Threading Tasks dll and System Ru
  • 为什么使用小于 32 位的整数?

    我总是喜欢使用最小尺寸的变量 这样效果就很好 但是如果我使用短字节整数而不是整数 并且内存是 32 位字可寻址 这真的会给我带来好处吗 编译器是否会做一些事情来增强内存使用 对于局部变量 它可能没有多大意义 但是在具有数千甚至数百万项的结构
  • 如何在 Linq to SQL 中使用distinct 和 group by

    我正在尝试将以下 sql 转换为 Linq 2 SQL select groupId count distinct userId from processroundissueinstance group by groupId 这是我的代码
  • C 函数 time() 如何处理秒的小数部分?

    The time 函数将返回自 1970 年以来的秒数 我想知道它如何对返回的秒数进行舍入 例如 对于100 4s 它会返回100还是101 有明确的定义吗 ISO C标准没有说太多 它只说time 回报 该实现对当前日历时间的最佳近似 结
  • 在 WPF 中使用 ReactiveUI 提供长时间运行命令反馈的正确方法

    我有一个 C WPF NET 4 5 应用程序 用户将用它来打开某些文件 然后 应用程序将经历很多动作 读取文件 通过许多插件和解析器传递它 这些文件可能相当大 gt 100MB 因此这可能需要一段时间 我想让用户了解 UI 中发生的情况
  • C# 中的 IPC 机制 - 用法和最佳实践

    不久前我在 Win32 代码中使用了 IPC 临界区 事件和信号量 NET环境下场景如何 是否有任何教程解释所有可用选项以及何时使用以及为什么 微软最近在IPC方面的东西是Windows 通信基础 http en wikipedia org
  • 为什么C++代码执行速度比java慢?

    我最近用 Java 编写了一个计算密集型算法 然后将其翻译为 C 令我惊讶的是 C 的执行速度要慢得多 我现在已经编写了一个更短的 Java 测试程序和一个相应的 C 程序 见下文 我的原始代码具有大量数组访问功能 测试代码也是如此 C 的
  • C# 使用“?” if else 语句设置值这叫什么

    嘿 我刚刚看到以下声明 return name null name NA 我只是想知道这在 NET 中叫什么 是吗 代表即然后执行此操作 这是一个俗称的 条件运算符 三元运算符 http en wikipedia org wiki Tern
  • DotNetZip:如何提取文件,但忽略zip文件中的路径?

    尝试将文件提取到给定文件夹 忽略 zip 文件中的路径 但似乎没有办法 考虑到其中实现的所有其他好东西 这似乎是一个相当基本的要求 我缺少什么 代码是 using Ionic Zip ZipFile zf Ionic Zip ZipFile

随机推荐

  • glibc的写入是如何工作的?

    我尝试编译一个简单的程序 名为write with nostdlib 但我收到错误 path to file 3 undefined reference to write 我想write是 Unix 的东西并且一直存在 但显然不是 事实证明
  • 解析一个数字但保留负数

    我正在尝试将数字取消格式化为其原始形式 但保留它是否为负数 堆栈溢出上的某人引导我找到了这段代码 该代码工作得非常好 但它没有保留负数 有人能帮我更好地解决这个问题吗 EDIT 对于美元货币 普通数字 Example 1 234 1234
  • 如何更新 OpenJDK 的时区信息?

    如何更新 OpenJDK 的时区信息 Oracle 推出了 tzupdater 但它受到他们的许可证的约束 所以我不想使用它 我正在寻找一个开源替代方案 它允许我只更新时区信息而不是整个 JRE Azul 最近发布了一个开源工具来更新 TZ
  • 用随机数据填充表

    我有如下两张表 区域 表 AreaKey AreaID
  • 以编程方式将 NSScrollView 滚动到右侧

    一切都在标题中 我想以编程方式滚动NSScrollView向右 这样我就可以看到文档的结尾 我试过这个 let width scrollView frame size width let height scrollView frame si
  • 无法运行“phonegap run android”,抛出异常

    我想开始使用 Phonegap 开发东西 我按照他们网站上的说明进行操作 http phonegap com install http phonegap com install 当我执行 phonegap run android 时 它给了
  • 避免PHP执行时间限制

    我需要用 PHP 语言创建一个脚本来执行数字排列 但 PHP 的执行时间限制设置为 60 秒 我怎样才能运行脚本 以便在需要运行超过60个sesunde时 不被服务器中断 我知道我可以更改 php 中的最大执行时间限制 但我想听到另一个不需
  • 获取 woocommerce 子类别产品

    我正在尝试让 woocommerce 子类别中的产品显示在主要类别下 ul class wsubcategs li a href a li ul
  • 使用 ruby​​ 加密数据,使用 Node 解密

    我想在 ruby 应用程序中加密一些数据 然后在 nodejs 应用程序中对其进行解码 我一直在尝试让它发挥作用 现在我只是尝试用两种语言加密同一段数据以获得相同的结果 但我似乎无法做到这一点 js var crypto require c
  • 在 Log4j2 中扩展 PatternLayout

    自从 Log4J2 以来org apache logging log4j core layout PatternLayout班级是final 我无法扩展它来为我的创建标头CSV 我引用了文档 它没有提供有关如何扩展现有布局的信息 http
  • 具有状态存储的 Kafka Streams - 应用程序重新启动时重新处理消息

    我们有以下带有两个变压器的拓扑 每个变压器都使用持久状态存储 kStreamBuilder stream inboundTopicName transform gt new FirstTransformer FIRST STATE STOR
  • 将文件添加到单击一次部署

    我有一个要转移到 ClickOnce 的应用程序 该应用程序有一个大小适中的数据文件夹 其中包含我需要在部署中包含的数百个文件 部署后 该文件夹需要位于与 EXE 相同的位置 我已经看到了一些关于如何执行此操作的建议 但似乎没有达成一致的方
  • og 元标签、社交按钮和 angularjs

    我正在创建一个使用多个视图的网站 标签和页面的标签通过 rootScope 变量进行更改 所以我有类似的东西 每当每个视图加载到网站上时 page title 变量都会发生变化 标题和 og title 标签也会更新 一切都按预期工作 问题
  • 使用特定 SVN 版本时,PIP 总是重新安装包

    当指定特定 SVN 修订版时 PIP 始终下载并安装包 显着减慢同步过程 有没有解决的办法 正常情况下pip会检测到环境中已经安装了该包并提示使用 upgrade 我的 pip requirements 文件具有以下行 svn http c
  • SymEnumSymbols 返回 ERROR_SUCCESS 但没有给出结果

    我正在尝试从已加载的 DLL 中枚举符号 对于那些有兴趣的人来说 这是CPC覆盖项目 https github com atlaste CPPCoverage 对于某些功能我需要符号数据 问题分解 当进程启动或加载 DLL 时 需要为已计划
  • 如何在自动布局中使用约束标识符以及如何使用标识符更改约束? [迅速]

    当我在 Xcode 7 中编辑约束时 我发现标识符Interface Builder 中的字段 约束的标识符是什么 如何使用它 使用标识符我可以以编程方式访问约束并更改常量吗 我的问题是该标识符为什么有帮助以及有何帮助 有没有办法通过在子视
  • 如何使用带有空格键的 Bootstrap Tags 输入插件提交标签?

    我正在构建一个带有字段的表单 该字段使用Bootstrap标签输入插件 http timschlechter github io bootstrap tagsinput examples 一旦用户单击该插件 该插件就会输入一个标签Enter
  • 获取列表中函数的名称

    我希望实现什么 所以我想在函数列表中获取函数的名称 这是一个例子 foo list foo1 sum foo2 mean 我想从中提取什么foo is list sum mean 我希望它是一个函数 意思是 gt foo list foo1
  • 使用反射从dll调用方法后获取返回值

    我正在使用反射加载 dll 并尝试调用返回List
  • TPL DataFlow处理异常的正确方法

    我在使用 TPL DataFlow 管理队列 数据库 并将工作重定向到网格计算服务的 Windows 服务中遇到问题 在某一时刻 BufferBlock 停止释放任务 我不知道为什么 我认为这是因为在执行某些任务期间发生了一些异常 但它们被