为什么这个 Observable.Generate 重载会导致内存泄漏? [使用时间跨度 < 15ms]

2024-02-02

以下 Rx.NET 代码在我的机器上运行大约 10 秒后将使用大约 500 MB 的内存。

var stream =
    Observable.Range(0, 10000)
              .SelectMany(i => Observable.Generate(
                  0, 
                  j => true, 
                  j => j + 1, 
                  j => new { N = j },
                  j => TimeSpan.FromMilliseconds(1)));

stream.Subscribe();

如果我使用Observable.Generate过载无Func<int, TimeSpan>参数我的内存使用量稳定在 35 MB。

var stream =
    Observable.Range(0, 10000)
              .SelectMany(i => Observable.Generate(
                  0,
                  j => true,
                  j => j + 1,
                  j => new { N = j }));
                  // j => TimeSpan.FromMilliseconds(1))); ** Removed! **

stream.Subscribe();

似乎只有在使用 SelectMany() 或 Merge() 扩展方法时才会出现问题。


这是使用哪个默认调度程序的问题。

随着TimeSpan调度程序的版本是DefaultScheduler。没有TimeSpan it is CurrentThreadScheduler.

因此,对于基于时间的生成,它会非常快速地尝试安排所有操作,并基本上建立一个等待执行的大量事件队列。因此它使用了大量的内存。

对于非基于时间的生成,它使用当前线程,因此它将连续生成和消耗每个生成的值,从而使用很少的内存。

哦,这不是内存泄漏。如果您尝试以快于消耗速度的速度安排无限数量的值,这只是正常操作。


我反编译了代码以找出使用了哪些调度程序。

这是非基于时间的反编译:

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
    if (condition == null)
        throw new ArgumentNullException("condition");
    if (iterate == null)
        throw new ArgumentNullException("iterate");
    if (resultSelector == null)
        throw new ArgumentNullException("resultSelector");
    return Observable.s_impl.Generate<TState, TResult>(initialState, condition, iterate, resultSelector);
}

public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
    return (IObservable<TResult>)new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
}

internal static IScheduler Iteration
{
    get
    {
        return (IScheduler)CurrentThreadScheduler.Instance;
    }
}

以上方法均来自Observable, QueryLanguage, and SchedulerDefaults分别。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么这个 Observable.Generate 重载会导致内存泄漏? [使用时间跨度 < 15ms] 的相关文章

随机推荐