我正在编写一个批处理管道,每 Y 秒处理 X 个未完成的操作。感觉像System.Reactive
非常适合这个,但我无法让订阅者并行执行。我的代码如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
有没有一种优雅的方式来读取这个缓冲Subject
,以并发方式?
Rx 订阅代码始终是同步的。您需要做的是从Subscribe
委托,并使其成为可观察序列的副作用。具体方法如下:
Subject<int> subject = new();
int concurrentCount = 0;
Task processor = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Select(list => Observable.Defer(() => Observable.Start(() =>
{
int c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
Interlocked.Decrement(ref concurrentCount);
})))
.Merge(maxConcurrent: 2)
.DefaultIfEmpty() // Prevents exception in corner case (empty source)
.ToTask(); // or RunAsync (either one starts the processor)
for (int i = 0; i < 1_000_000; i++)
{
subject.OnNext(i);
}
subject.OnCompleted();
processor.Wait();
The Select
+Observable.Defer
+Observable.Start
组合将源序列转换为IObservable<IObservable<Unit>>
。它是一个嵌套序列,每个内部序列代表一个序列的处理list
。当代表Observable.Start
完成后,内部序列发出一个Unit
值,然后完成。包装Defer
运算符确保内部序列是“冷”的,以便它们在被订阅之前不会启动。然后遵循Merge运算符,它将外部序列展开为平面IObservable<Unit>
顺序。这maxConcurrent
参数配置将同时订阅多少个内部序列。每次内部序列被订阅时Merge
运算符,对应的Observable.Start
委托开始在ThreadPool
thread.
如果您设置maxConcurrent
太高了,则ThreadPool
可能会耗尽工人(换句话说,它可能会饱和),并且
然后,您的代码的并发性将取决于ThreadPool
可用性。如果您愿意,您可以增加ThreadPool
通过使用立即按需创建ThreadPool.SetMinThreads方法。但是,如果您的工作负载受 CPU 限制,并且您将工作线程增加到高于Environment.ProcessorCount
值,那么您的 CPU 很可能会饱和。
如果您的工作负载是异步的,您可以替换Observable.Defer
+Observable.Start
与Observable.FromAsync
运算符,如图here.
¹ An unpublished library exists, the AsyncRx.NET, that plays with the idea of asynchronous subscriptions. It is based on the new interfaces IAsyncObservable<T>
and IAsyncObserver<T>
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)