为什么从给定订阅者抛出异常时永远不会调用 OnError 回调?

2023-12-05

请观察以下单元测试:

using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace UnitTests
{
    [TestClass]
    public class TestRx
    {
        public const int UNIT_TEST_TIMEOUT = 5000;

        private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
        {
            return Observable.Create<int>(async (obs, cancellationToken) =>
            {
                for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
                {
                    int value = i;
                    obs.OnNext(await Task.Factory.StartNew(() =>
                    {
                        Thread.Sleep(msWait);
                        return value;
                    }));
                }
            });
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void Subscribe()
        {
            var tcs = new TaskCompletionSource<object>();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
            }, e => Assert.Fail(), () =>
            {
                Assert.AreEqual(100, i);
                tcs.TrySetResult(null);
            });

            tcs.Task.Wait();
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void SubscribeCancel()
        {
            var tcs = new TaskCompletionSource<object>();
            var cts = new CancellationTokenSource();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
                if (i == 5)
                {
                    cts.Cancel();
                }
            }, e =>
            {
                Assert.IsTrue(i < 100);
                tcs.TrySetResult(null);
            }, () =>
            {
                Assert.IsTrue(i < 100);
                tcs.TrySetResult(null);
            }, cts.Token);

            tcs.Task.Wait();
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void SubscribeThrow()
        {
            var tcs = new TaskCompletionSource<object>();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
                if (i == 5)
                {
                    throw new Exception("xo-xo");
                }
            }, e =>
            {
                Assert.AreEqual("xo-xo", e.Message);
                tcs.TrySetResult(null);
            }, Assert.Fail);

            tcs.Task.Wait();
        }
    }
}

单元测试SubscribeCancel and SubscribeThrow超时,因为OnError回调永远不会被调用,因此任务的等待永远不会结束。

怎么了?

P.S.

这个问题与如何正确地将 SqlDataReader 与 IObservable 包装在一起?

EDIT

与此同时,我创建了一个新的 Rx 问题 -https://rx.codeplex.com/workitem/74

Also http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-known-when-throwing-from-the-给定订阅者?forum=rx

EDIT2

以下观察者实现产生完全相同的结果,即使它符合第 6.5 段接收设计指南- “订阅实现不应抛出”:

private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
    return Observable.Create<int>(async (obs, cancellationToken) =>
    {
        try
        {
            for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
            {
                int value = i;
                obs.OnNext(await Task.Factory.StartNew(() =>
                {
                    Thread.Sleep(msWait);
                    return value;
                }));
            }
            obs.OnCompleted();
        }
        catch (Exception exc)
        {
            obs.OnError(exc);
        }
    });
}

EDIT3

我开始相信,当异步可观察序列集成到其他同步代码中时,应该编写这样的代码(通常是服务器端某个地方或另一个地方的情况):

var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
  try
  {
    ...
  }
  catch (Exception e)
  {
    DoErrorLogic();
    tcs.TrySetException(e);
  }
}, e =>
{
  DoErrorLogic();
  tcs.TrySetException(e);
}, () => 
{
  DoCompletedLogic();
  tcs.TrySetResult(null);
});

tcs.Task.Wait();

真的是这样吗?

EDIT 4

我想你想说的话终于开始从我生锈的大脑里流下来了。我现在要切换到我的另一个帖子 -如何正确地将 SqlDataReader 与 IObservable 包装在一起?


此行为是设计使然。如果订阅者抛出异常(顺便说一下,这是不好的做法),Rx 框架会正确地推断它已死亡并不再与其通信。如果取消订阅,这也不是错误 - 只是请求不再发送任何类型的进一步事件 - Rx 对此表示尊重。

编辑回应评论

我认为文档中没有一个简单的参考可以指出 - 您所看到的行为是如此内在,它是隐含的。我能得到的最接近的是向您指出源代码匿名安全观察者 and 自动分离观察者。后者有一个可能有帮助的解释性场景,但有点复杂。

也许一个类比会有所帮助。想象一下数据流事件是报刊亭递送的报纸。订阅者是家庭。

订阅者抛出异常

报刊亭高兴地送报纸,直到有一天,其中一位订阅者——琼斯先生——打开了煤气,他的房子爆炸了,杀死了琼斯先生并摧毁了房子(抛出未处理的异常)。报刊亭意识到他无法再向琼斯先生递送报纸,也无法发送终止通知,并且报纸供应没有问题(因此 OnError 或 OnCompleted 不合适),报刊亭继续减少一名订户。

与此形成鲜明对比的是,报纸印刷商无意中使用了易燃墨水,导致工厂着火。现在可怜的报刊亭确实必须向all供应无限期停止的是订户。

订阅者取消订阅

琼斯先生正在通过订阅接收报纸,直到有一天,他觉得自己厌倦了无休止的令人沮丧的故事,并要求取消订阅。报刊亭有义务。他没有给琼斯先生发一张便条,解释报纸已经停止印刷版本(没有 OnCompleted)——他们没有。他也没有给琼斯先生发一张便条,解释报纸已经停业(没有 OnError)——他只是按照琼斯先生的要求停止递送报纸。

对编辑3的回应

我同情你的挣扎。我注意到在您的整个代码中,您一直在尝试将 TPL(任务)习惯与 Rx 习惯相结合。这样的尝试常常让人感觉很笨拙,因为它们确实是完全不同的世界。很难对这样的一段进行评论:

我开始相信,当异步可观察序列集成到其他同步代码中时,应该编写这样的代码(通常是服务器端某个地方或另一个地方的情况):

强烈同意布兰登的精心断言,我想不出真正适合以您尝试的方式将异步代码与服务器端的同步代码集成的实例。这对我来说就像一种设计的味道。按照惯例,人们会尝试使代码保持反应性 - 进行订阅,并让订阅者反应性地处理工作。我不记得有必要按照您描述的方式转换为同步代码。

当然,看看您在 Edit3 中编写的代码,并不清楚您想要实现什么目标。这不是当事人的责任source对 a 中的错误做出反应订户。这就是尾巴摇狗。确保订阅者服务连续性所需的异常处理程序应该位于订阅处理代码中,而不是位于可观察源中 - 它应该只关心防止恶意观察者行为。这样的逻辑在上面链接的 AnonymousSafeObserver 中实现,并且被大多数 Rx 提供的操作员使用。可观察的东西很可能有逻辑来处理其连续性source数据 - 但这是一个不同的问题,而不是您在代码中解决的问题。

无论您在何处尝试通过调用桥接到同步代码ToTask or Wait可能有理由仔细考虑你的设计。

我认为提供更具体的问题陈述(也许来自您试图解决的现实世界场景)将有助于为您引出更有用的建议。您所说的“SqlDataReader”示例...

最后,人们可以通过订阅直接使用可观察的[包装 SqlDataReader],但他们必须在某个时刻等待结束(阻塞线程),因为大多数代码仍然是同步的。

...突出了您所处的设计困境。在这种情况下,您推断此类消费者显然会更好地使用IEnumerable<T>接口 - 或者可能要求一个IObservable<List<T>>。但关键是要着眼于更大的图景,事实上您正在尝试将 SqlDataReader 包装在可观察的包装器中at all是一种设计味道 - 因为这是响应特定的一次性请求而提供固定数据的过程。这可能是一种异步场景,但并不是真正的反应式场景。与更典型的对比reactive像“只要你得到股票 X 的价格就给我发送”这样的场景,你完全按照源的要求设置未来的数据流,以便订阅者做出反应。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么从给定订阅者抛出异常时永远不会调用 OnError 回调? 的相关文章

  • 在 xaml 中编写嵌套类型时出现设计时错误

    我创建了一个用户控件 它接受枚举类型并将该枚举的值分配给该用户控件中的 ComboBox 控件 很简单 我在数据模板中使用此用户控件 当出现嵌套类型时 问题就来了 我使用这个符号来指定 EnumType x Type myNamespace
  • 类型中的属性名称必须是唯一的

    我正在使用 Entity Framework 5 并且有以下实体 public class User public Int32 Id get set public String Username get set public virtual
  • std::list 线程push_back、front、pop_front

    std list 线程安全吗 我假设不是这样 所以我添加了自己的同步机制 我认为我有正确的术语 但我仍然遇到问题 每个函数都由单独的线程调用 Thread1 不能等待 它必须尽可能快 std list
  • std::vector 与 std::stack

    有什么区别std vector and std stack 显然 向量可以删除集合中的项目 尽管比列表慢得多 而堆栈被构建为仅后进先出的集合 然而 堆栈对于最终物品操作是否更快 它是链表还是动态重新分配的数组 我找不到关于堆栈的太多信息 但
  • 用于 FTP 的文件系统观察器

    我怎样才能实现FileSystemWatcherFTP 位置 在 C 中 这个想法是 每当 FTP 位置添加任何内容时 我都希望将其复制到我的本地计算机 任何想法都会有所帮助 这是我之前问题的后续使用 NET 进行选择性 FTP 下载 ht
  • C# 列表通用扩展方法与非通用扩展方法

    这是一个简单的问题 我希望 集合类中有通用和非通用方法 例如List
  • WcfSvcHost 的跨域异常

    对于另一个跨域问题 我深表歉意 我一整天都在与这个问题作斗争 现在已经到了沸腾的地步 我有一个 Silverlight 应用程序项目 SLApp1 一个用于托管 Silverlight SLApp1 Web 的 Web 项目和 WCF 项目
  • 结构体的内存大小不同?

    为什么第一种情况不是12 测试环境 最新版本的 gcc 和 clang 64 位 Linux struct desc int parts int nr sizeof desc Output 16 struct desc int parts
  • 为什么 C# 2.0 之后没有 ISO 或 ECMA 标准化?

    我已经开始学习 C 并正在寻找标准规范 但发现大于 2 0 的 C 版本并未由 ISO 或 ECMA 标准化 或者是我从 Wikipedia 收集到的 这有什么原因吗 因为编写 审查 验证 发布 处理反馈 修订 重新发布等复杂的规范文档需要
  • 实例化类时重写虚拟方法

    我有一个带有一些虚函数的类 让我们假设这是其中之一 public class AClassWhatever protected virtual string DoAThingToAString string inputString retu
  • 为什么使用小于 32 位的整数?

    我总是喜欢使用最小尺寸的变量 这样效果就很好 但是如果我使用短字节整数而不是整数 并且内存是 32 位字可寻址 这真的会给我带来好处吗 编译器是否会做一些事情来增强内存使用 对于局部变量 它可能没有多大意义 但是在具有数千甚至数百万项的结构
  • 如何在 Linq to SQL 中使用distinct 和 group by

    我正在尝试将以下 sql 转换为 Linq 2 SQL select groupId count distinct userId from processroundissueinstance group by groupId 这是我的代码
  • C# 中的 IPC 机制 - 用法和最佳实践

    不久前我在 Win32 代码中使用了 IPC 临界区 事件和信号量 NET环境下场景如何 是否有任何教程解释所有可用选项以及何时使用以及为什么 微软最近在IPC方面的东西是Windows 通信基础 http en wikipedia org
  • 为什么C++代码执行速度比java慢?

    我最近用 Java 编写了一个计算密集型算法 然后将其翻译为 C 令我惊讶的是 C 的执行速度要慢得多 我现在已经编写了一个更短的 Java 测试程序和一个相应的 C 程序 见下文 我的原始代码具有大量数组访问功能 测试代码也是如此 C 的
  • C# 中最小化字符串长度

    我想减少字符串的长度 喜欢 这串 string foo Lorem ipsum dolor sit amet consectetur adipiscing elit Aenean in vehicula nulla Phasellus li
  • C# 使用“?” if else 语句设置值这叫什么

    嘿 我刚刚看到以下声明 return name null name NA 我只是想知道这在 NET 中叫什么 是吗 代表即然后执行此操作 这是一个俗称的 条件运算符 三元运算符 http en wikipedia org wiki Tern
  • DotNetZip:如何提取文件,但忽略zip文件中的路径?

    尝试将文件提取到给定文件夹 忽略 zip 文件中的路径 但似乎没有办法 考虑到其中实现的所有其他好东西 这似乎是一个相当基本的要求 我缺少什么 代码是 using Ionic Zip ZipFile zf Ionic Zip ZipFile
  • 现代编译器是否优化乘以 1 和 -1

    如果我写 template
  • 如何确定 CultureInfo 实例是否支持拉丁字符

    是否可以确定是否CultureInfo http msdn microsoft com en us library system globalization cultureinfo aspx我正在使用的实例是否基于拉丁字符集 我相信你可以使
  • 使用 WGL 创建现代 OpenGL 上下文?

    我正在尝试使用 Windows 函数创建 OpenGL 上下文 现代版本 基本上代码就是 创建窗口类 注册班级 创建一个窗口 choose PIXELFORMATDESCRIPTOR并设置它 创建旧版 OpenGL 上下文 使上下文成为当前

随机推荐

  • jQuery .each 在 Safari 上比 Chrome/Firefox 慢

    我有一个大型 HTML 表格 1 000 1 500 行 40 列宽 我有一些输入和选择框 以便用户可以过滤行 附加的相关 javascript jquery 注意 没有粘贴整个代码库 因为它不是瓶颈 如下所示 function autoR
  • 错误:文档未定义,在构建角度通用应用程序时

    Error factory require jquery document window navigator ReferenceError document is not defined 面对角度通用渲染服务器端的问题 我用谷歌搜索了这个并
  • 如何理解UML中类似自反关联的“自反聚合”关系

    这个类图是什么意思 自反关联的类图使用了实线和箭头 但这里用空心菱形代替 和递归有关系吗 这个类图会生成什么 如果能举个例子就最好了 另外这个递归关系应该是一对多 如果存储数据库怎么建表 这是什么意思 这个类图意味着Unit can与其他几
  • 如何保持Python脚本输出窗口打开?

    我刚刚开始使用Python 当我在 Windows 上执行 python 脚本文件时 输出窗口出现但立即消失 我需要它留在那里 以便我可以分析我的输出 我怎样才能保持它打开 您有几个选择 从已打开的终端运行该程序 打开命令提示符并键入 py
  • Android Studio Arctic Fox (Adb) - 连接的设备在一段时间后断开连接

    我有 7 部不同的 Android 手机 它们都有不同的 Android 操作系统 他们都面临着同样的断线问题 大约一小时后 设备将断开连接 当它断开连接时 Allow USB debugging 出现弹出窗口twice并希望我允许已批准的
  • 将 RIGHT 外连接转换为左外连接 SQLite

    我需要为 Sqlite 设计一个查询 原来的查询是这样的 FROM PhysicianActivity INNER JOIN Activity ON PhysicianActivity ActivityID Activity ID RIGH
  • 在运行时将 byte[] 加载到 System.Windows.Controls.Image 中

    我有一个代表 png 文件的 byte 我正在通过 WebClient 下载此 png 当 WebClient 下载了我通过 URL 引用的 png 时 我得到一个 byte 我的问题是 如何将 byte 加载到 WPF 中的 System
  • 调整大小后 QRubberBand 在 QGraphicsView 上移动

    我在这个主题中遇到了同样的问题 当我调整窗口大小时 QRubberBand 移动 经过几次尝试 我意识到本主题的解决方案不适用于 QGraphics 视图 当我调整窗口大小时 为什么我的选择会围绕 QgraphicsView 移动 impo
  • 如何使用seaborn displot 将直方图条形围绕刻度线居中?堆叠酒吧是必不可少的

    我搜索了许多制作以刻度线为中心的直方图的方法 但无法找到适用于 seaborn displot 的解决方案 函数 displot 允许我根据数据框中的列堆叠直方图 因此更喜欢使用 displot 的解决方案或允许基于数据框中的列进行堆叠的解
  • 遍历整个 PDF 并将蓝色更改为黑色(同时更改下划线的颜色)+ iText

    我正在使用下面的代码从 pdf 文本中删除蓝色 它运行良好 但它不是改变下划线颜色 而是正确改变文本颜色 原始文件部分 被操纵的文件 正如您在上面的操作文件中看到的 下划线颜色没有改变 两周以来我一直在寻找解决这个问题的方法 任何人都可以帮
  • Firebase 中的嵌套列表[重复]

    这个问题在这里已经有答案了 尝试了解如何在 Firebase 中实现嵌套列表 问题可简化为 一个 1 对 N 消息传递系统 其中 对于每条消息 您希望维护已接收并阅读该消息的用户列表 已读过 Firebase 中数组的最佳实践 尝试避免使用
  • 如何更改 pandas groupby.agg 函数的输入参数?

    我在使用 groupby object agg 方法和想要更改输入参数的函数时遇到问题 是否有可用的函数名称资源 agg 接受 以及如何将参数传递给它们 请参阅下面的示例 import pandas as pd import numpy a
  • Jenkins 无法访问已安装的驱动程序

    我的 Jenkins 服务器在 Windows 2008 服务器上的 Tomcat 中运行 我在CentOS上搭建了一个NFS服务器 Win2008服务器可以通过mount命令访问NFS共享文件夹 我在 Windows 上以管理员用户身份运
  • 阻止孩子覆盖父母的轮廓?

    由于某些元素位于幻灯片内 我使用带有负偏移量的轮廓而不是边框 然而 子元素覆盖了轮廓 但我想要它们上面的边框 我用它来构建内容 http jsfiddle net z22kw2zq 1 parent position relative ou
  • 有效地查找数组中元素的行列?

    如何有效地找到数组中每个元素的排名 在平局的情况下求平均值 例如 float rank T T input Implementation auto foo rank 3 6 4 2 2 foo 3 5 4 1 5 1 5 我能想到的唯一方法
  • 当应用程序处于后台或关闭状态时,无法从 Firebase 消息服务将记录插入 SQLite 数据库

    我正在尝试 Firebase 通知 我能够使用通知正常工作this文档 消息已收到 我能够从内部向通知栏发送通知MyFirebaseMessagingService服务等级 即使应用程序处于后台或关闭时也会发生这种情况 我需要的是收集通知中
  • 授权 Twitter 成功后返回应用

    我将我的应用程序配置为使用通过推特登录特征 但在 Safari 上授权应用程序成功后 我无法返回到我的应用程序 我在堆栈溢出上看到了另一个有同样问题的问题使用 URLShemes但这对我不起作用 我没有足够的声誉来添加评论来要求他们接受答案
  • 充分利用 MVC Owin 身份和 n(3) 层架构

    我一直在学习开箱即用欧文身份我喜欢它为我们提供的用户管理的易用性 然后我遇到的问题是它通过 似乎 直接与 EF 交互ApplicationDbContext这是我不想要的 我更喜欢使用我的 3 层架构 IE 它与服务层 BLL 交互 而服务
  • 区分退出和会话超时

    我有以下要求 当用户终止 bash 会话时生成审核日志 退出 当 bash 会话超时时生成审核日志 这些审核日志必须不同 我正在玩下面的脚本trap sh export TMOUT 10 function handle timeout ec
  • 为什么从给定订阅者抛出异常时永远不会调用 OnError 回调?

    请观察以下单元测试 using System using System Reactive Linq using System Threading using System Threading Tasks using Microsoft Vi