如何使以下可观察的重复,直到stream.DataAvailable为假?
目前看来它永远不会停止。
Defer 部分内的 AsyncReadChunk 和 Observable.Return 进行 OnNext 调用,然后进行 OnCompleted 调用。
当 Repeat 收到 OnNext 调用时,它将其传递给 TakeWhile。当 TakeWhile 不满足时,它会完成可观察量,但我认为 OnNext 之后出现的 OnCompleted 速度太快,以至于导致 Repeat 重新订阅可观察量并导致无限循环。
我该如何纠正这种行为?
public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
return Observable.Defer(() =>
{
try
{
return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
}
catch (Exception)
{
return Observable.Return(new byte[0]);
}
})
.Repeat()
.TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}
自答:(下面是问题作者 Samet 发布的答案。但是,他将答案作为问题的一部分发布。我将其移至单独的答案中,标记为社区 wiki,因为作者尚未移动它他自己。)
我通过重构发现是调度器的问题。 Return 使用 Immediate 调度程序,而 Repeat 使用 CurrentThread。固定代码如下。
public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
return Observable.Defer(() =>
{
try
{
return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread);
}
catch (Exception)
{
return Observable.Return(new byte[0], Scheduler.CurrentThread);
}
})
.Repeat()
.TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)