RX - 重新抛出包含方法中的错误

2024-01-12

我需要翻译 RX 流中的错误(IObservable) 进入包含流订阅的方法中的异常

(因为这个问题https://github.com/aspnet/SignalR/pull/1331 https://github.com/aspnet/SignalR/pull/1331,因此错误不会序列化给客户端。)一旦解决此问题,我将恢复正确处理错误

e.g.

我有以下方法

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return _mySvc.ThingChanged();
}

所以我尝试订阅流并重新抛出错误,但它仍然没有传输到客户端:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    _mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
    return _mySvc.ThingChanged();
}

private void OnError(Exception exception)
{
    throw new Exception(exception.Message);
}

我需要的是相当于扔进 Live Stream 方法

例如该错误被传播到客户端

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    throw new Exception("some error message");
    return _mySvc.ThingChanged();
}

有什么想法如何实现这一目标?


我也发现了这一点,特别是对于“包含的”反应式管道,即具有明确定义的开始和结束的管道。在这种情况下,只需允许底层异常冒泡到包含范围就足够了。但正如您所发现的,这个概念通常对于 Rx 来说相当陌生:管道中发生的事情仍保留在管道中。

我在包含的场景中发现的唯一方法是使用以下命令将错误从流中“滑出”Catch(),并交回一个空的IObservable让流自然停止(否则,如果你awaiting an IObservable以便完成)。

这是行不通的within your LiveStream()方法,因为在您使用流之前,上下文/范围应该已经不存在了。因此,这必须在包含整个管道的上下文中发生。

Exception error = null;
var source = LiveStream()
  .Catch<WhatYoureStreaming, Exception>(ex => {error = ex; return Observable.Empty<WhatYoureStreaming>(); })
  ...

await source; // if this is how you're awaiting completion

// not a real exception type, use your own
if (error != null) throw new ContainingException("oops", error);

只是不要throw error最后,您将丢失原始的堆栈跟踪。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RX - 重新抛出包含方法中的错误 的相关文章

随机推荐