我想订阅IObservable<T>
并在收到类型的第一个元素后立即取消订阅(处置)订阅T
,即我只想在订阅后获得的第一个元素上调用操作。
这是我想出的方法:
public static class Extensions
{
public static void SubscribeOnce<T>(this IObservable<T> observable, Action<T> action)
{
IDisposable subscription = null;
subscription = observable.Subscribe(t =>
{
action(t);
subscription.Dispose();
});
}
}
用法示例:
public class Program
{
public static void Main()
{
var subject = new Subject<int>();
subject.OnNext(0);
subject.OnNext(1);
subject.SubscribeOnce(i => Console.WriteLine(i));
subject.OnNext(2);
subject.OnNext(3);
Console.ReadLine();
}
}
它按预期工作并且仅打印2
. 这有什么问题或者其他需要考虑的吗?是否有更干净的方法使用 RX 开箱即用的扩展方法?
var source = new Subject();
source
.Take(1)
.Subscribe(Console.WriteLine);
source.OnNext(5);
source.OnNext(6);
source.OnError(new Exception("oops!"));
source.OnNext(7);
source.OnNext(8);
// Output is "5". Subscription is auto-disposed. Error is ignored.
Take
订阅后自动销毁nth
元素产量。 :)
至于要考虑的其他事项,对于您的自定义可观察对象,您应该注意,您可能还希望将 OnError 和 OnCompleted 通知传递给您的观察者,Take 也会为您处理。
内置运算符还有其他好处,例如更好的Dispose
处理。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)