然而,虽然 eventStream 仍然存在并且应该保留在内存中,但此方法创建了 2 个新的 IObservables - 一个由Where() 方法返回,可能由 eventStream 保存在内存中,另一个由Cast() 方法可能由Where() 方法返回的方法保存在内存中。
你有这个落后。让我们回顾一下正在发生的事情的链条。
IObservable<T> eventStream; //you have this defined and assigned somewhere
public IDisposable Subscribe<T>(IObserver<T> observer)
{
//let's break this method into multiple lines
IObservable<T> whereObs = eventStream.Where(e => e is T);
//whereObs now has a reference to eventStream (and thus will keep it alive),
//but eventStream knows nothing of whereObs (thus whereObs will not be kept alive by eventStream)
IObservable<T> castObs = whereObs.Cast<T>();
//as with whereObs, castObs has a reference to whereObs,
//but no one has a reference to castObs
IDisposable ret = castObs.Subscribe(observer);
//here is where it gets tricky.
return ret;
}
What ret
是否有引用取决于各种可观察量的实现。从我在 Rx 库的 Reflector 和我自己编写的运算符中看到的情况来看,大多数运算符不会返回引用可观察运算符本身的一次性对象。
例如,一个基本的实现Where
会是这样的(直接在编辑器中输入,没有错误处理)
IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> filter)
{
return Observable.Create<T>(obs =>
{
return source.Subscribe(v => if (filter(v)) obs.OnNext(v),
obs.OnError, obs.OnCompleted);
}
}
请注意,返回的一次性将通过创建的观察者引用过滤器函数,但不会引用Where
可观察到的。Cast
可以使用相同的模式轻松实现。本质上,操作员成为观察者包装工厂。
所有这些对当前问题的影响是中间 IObservables 有资格进行垃圾收集在方法结束时。过滤函数传递给Where
只要订阅存在,就会一直存在,但是一旦订阅被处置或完成,仅eventStream
仍然存在(假设它还活着)。
EDIT对于 supercat 的评论,让我们看看编译器如何重写它,或者如何在没有闭包的情况下实现它。
class WhereObserver<T> : IObserver<T>
{
WhereObserver<T>(IObserver<T> base, Func<T, bool> filter)
{
_base = base;
_filter = filter;
}
IObserver<T> _base;
Func<T, bool> _filter;
void OnNext(T value)
{
if (filter(value)) _base.OnNext(value);
}
void OnError(Exception ex) { _base.OnError(ex); }
void OnCompleted() { _base.OnCompleted(); }
}
class WhereObservable<T> : IObservable<T>
{
WhereObservable<T>(IObservable<T> source, Func<T, bool> filter)
{
_source = source;
_filter = filter;
}
IObservable<T> source;
Func<T, bool> filter;
IDisposable Subscribe(IObserver<T> observer)
{
return source.Subscribe(new WhereObserver<T>(observer, filter));
}
}
static IObservable<T> Where(this IObservable<T> source, Func<T, bool> filter)
{
return new WhereObservable(source, filter);
}
您可以看到观察者不需要对生成它的可观察对象进行任何引用,并且可观察对象也不需要跟踪它创建的观察者。我们甚至没有创建任何新的 IDisposable 来从我们的订阅中返回。
实际上,Rx 有一些用于匿名可观察/观察者的实际类,它们接受委托并将接口调用转发给这些委托。它使用闭包来创建这些委托。编译器不需要生成实际实现接口的类,但翻译的精神保持不变。