System.Reactive 中的并发订阅者执行

2023-12-14

我正在编写一个批处理管道,每 Y 秒处理 X 个未完成的操作。感觉像System.Reactive非常适合这个,但我无法让订阅者并行执行。我的代码如下所示:

var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

有没有一种优雅的方式来读取这个缓冲Subject,以并发方式?


Rx 订阅代码始终是同步的。您需要做的是从Subscribe委托,并使其成为可观察序列的副作用。具体方法如下:

Subject<int> subject = new();
int concurrentCount = 0;

Task processor = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Select(list => Observable.Defer(() => Observable.Start(() =>
    {
        int c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
        Interlocked.Decrement(ref concurrentCount);
    })))
    .Merge(maxConcurrent: 2)
    .DefaultIfEmpty() // Prevents exception in corner case (empty source)
    .ToTask(); // or RunAsync (either one starts the processor)

for (int i = 0; i < 1_000_000; i++)
{
    subject.OnNext(i);
}
subject.OnCompleted();

processor.Wait();

The Select+Observable.Defer+Observable.Start组合将源序列转换为IObservable<IObservable<Unit>>。它是一个嵌套序列,每个内部序列代表一个序列的处理list。当代表Observable.Start完成后,内部序列发出一个Unit值,然后完成。包装Defer运算符确保内部序列是“冷”的,以便它们在被订阅之前不会启动。然后遵循Merge运算符,它将外部序列展开为平面IObservable<Unit>顺序。这maxConcurrent参数配置将同时订阅多少个内部序列。每次内部序列被订阅时Merge运算符,对应的Observable.Start委托开始在ThreadPool thread.

如果您设置maxConcurrent太高了,则ThreadPool可能会耗尽工人(换句话说,它可能会饱和),并且 然后,您的代码的并发性将取决于ThreadPool可用性。如果您愿意,您可以增加ThreadPool通过使用立即按需创建ThreadPool.SetMinThreads方法。但是,如果您的工作负载受 CPU 限制,并且您将工作线程增加到高于Environment.ProcessorCount值,那么您的 CPU 很可能会饱和。

如果您的工作负载是异步的,您可以替换Observable.Defer+Observable.StartObservable.FromAsync运算符,如图here.

¹ An unpublished library exists, the AsyncRx.NET, that plays with the idea of asynchronous subscriptions. It is based on the new interfaces IAsyncObservable<T> and IAsyncObserver<T>.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

System.Reactive 中的并发订阅者执行 的相关文章