我正在尝试将 Rx(.net) 用于项目,但我遇到了如何正确处置在Observable.Create()
并发出OnNext()
。我的设置如下所示(希望仅缩短到相关位):
var obs = Observable.Create<ReactiveRunData>(async (o) =>
{
if (someCondition)
{
RunData runData = await CreateRunData(); // RunData is IDisposable, needs to be disposed
o.OnNext(runData);
}
o.OnCompleted();
return Disposable.Empty;
})
.Concat(Observable.Empty<RunData>().Delay(TimeSpan.FromSeconds(2)))
.Repeat() // Resubscribe indefinitely after source completes
.Publish().RefCount() // see http://northhorizon.net/2011/sharing-in-rx/
;
这是我对可观察集合的实现,该集合是无限的并生成一个项目(类型为RunData
)每 2 秒一次。
然后我通过使用各种运算符转换 IObservable 流来执行实际的反应性操作:
var final = obs
.Select(runData => ...)
// lots of other operations
.Select(tuple => (tuple.runData, tuple.result));`
最终的可观察值返回一个元组(RunData, Result)
.
当订阅该可观察对象时,最后我明确地处置了RunData
实例,像这样:
final.Subscribe(
async (tuple) =>
{
var (runData, result) = tuple;
try
{
// ... do something with result and runData, await some stuff, ...
} catch (Exception e)
{
// error handling
} finally
{
// dispose of runData
runData.Dispose();
}
},
(Exception e) =>
{
// error handling
});
我怀疑这个实现以各种方式存在泄漏,例如当从不同的地方抛出异常时,在某些情况下,我相信 RunData 实例不会被释放,而是消失了,被通过管道传输的异常所取代。
我相信如果我向我的可观察对象添加第二个订阅者,我也会遇到问题,对吧?我不需要多个订阅者,但这也让我对我的实现产生疑问。
我觉得传递需要由订阅者处理的数据的整个想法是错误的,但我想不出更好的方法。我尝试使用Observable.Using()
,但据我所知,仅在序列结束时才释放资源,而就我而言,永远不会。我真的需要一个无限序列,因为我希望能够使用像Scan()
随着时间的推移,参考以前的数据来构建中间数据结构。
我还尝试使用从 lambda 返回的回调Observable.Create()
但这是在 Observable.Create() 完成后立即触发的,而不是在订阅者完成后触发的,因此导致了竞争条件(但我什至不确定我是否理解正确,RX + 异步很棘手)。
那么...我怎样才能正确实施呢?
对于一些背景信息,RunData
包括(除其他外)数据库事务和 AutofacLifetimeScope
,我想在整个管道中使用它们,并且都需要在订阅者完成后在最后进行处理。