这里有一个TransformBlock
and ActionBlock
每当收到较新的消息并且接收到新消息时,就会删除队列中最旧的消息BoundedCapacity
已达到限制。它的行为非常类似于Channel配置有BoundedChannelFullMode.DropOldest.
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockDropOldest<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
var boundedCapacity = dataflowBlockOptions.BoundedCapacity;
var cancellationToken = dataflowBlockOptions.CancellationToken;
var queue = new Queue<TInput>(Math.Max(0, boundedCapacity));
var outputBlock = new BufferBlock<TOutput>(new DataflowBlockOptions()
{
BoundedCapacity = boundedCapacity,
CancellationToken = cancellationToken
});
if (boundedCapacity != DataflowBlockOptions.Unbounded)
dataflowBlockOptions.BoundedCapacity = checked(boundedCapacity * 2);
// After testing, at least boundedCapacity + 1 is required.
// Make it double to be sure that all non-dropped messages will be processed.
var transformBlock = new ActionBlock<object>(async _ =>
{
TInput item;
lock (queue)
{
if (queue.Count == 0) return;
item = queue.Dequeue();
}
var result = await transform(item).ConfigureAwait(false);
await outputBlock.SendAsync(result, cancellationToken).ConfigureAwait(false);
}, dataflowBlockOptions);
dataflowBlockOptions.BoundedCapacity = boundedCapacity; // Restore initial value
var inputBlock = new ActionBlock<TInput>(item =>
{
var droppedEntry = (Exists: false, Item: (TInput)default);
lock (queue)
{
transformBlock.Post(null);
if (queue.Count == boundedCapacity) droppedEntry = (true, queue.Dequeue());
queue.Enqueue(item);
}
if (droppedEntry.Exists) droppedMessages?.Report(droppedEntry.Item);
}, new ExecutionDataflowBlockOptions()
{
CancellationToken = cancellationToken
});
PropagateCompletion(inputBlock, transformBlock);
PropagateFailure(transformBlock, inputBlock);
PropagateCompletion(transformBlock, outputBlock);
_ = transformBlock.Completion.ContinueWith(_ => { lock (queue) queue.Clear(); },
TaskScheduler.Default);
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
}
async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
}
}
// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockDropOldest<TInput, TOutput>(
Func<TInput, TOutput> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
{
return CreateTransformBlockDropOldest(item => Task.FromResult(transform(item)),
dataflowBlockOptions, droppedMessages);
}
// ActionBlock equivalent
public static ITargetBlock<TInput>
CreateActionBlockDropOldest<TInput>(
Func<TInput, Task> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
{
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateTransformBlockDropOldest<TInput, object>(
async item => { await action(item).ConfigureAwait(false); return null; },
dataflowBlockOptions, droppedMessages);
block.LinkTo(DataflowBlock.NullTarget<object>());
return block;
}
// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
CreateActionBlockDropOldest<TInput>(
Action<TInput> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
{
return CreateActionBlockDropOldest(
item => { action(item); return Task.CompletedTask; },
dataflowBlockOptions, droppedMessages);
}
这个想法是将排队的项目存储在辅助中Queue
,并将虚拟(空)值传递给内部ActionBlock<object>
。该块忽略作为参数传递的项目,并从队列中获取一个项目(如果有)。 αlock
用于确保队列中所有未丢弃的项目最终都会被处理(当然除非发生异常)。
还有一个额外的功能。可选的IProgress<TInput> droppedMessages
参数允许在每次删除消息时接收通知。
使用示例:
_messagingActionBlock = CreateActionBlockDropOldest<string>(msg =>
{
Console.WriteLine($"Processing: {msg}");
Thread.Sleep(5000);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2,
}, new Progress<string>(msg =>
{
Console.WriteLine($"Message dropped: {msg}");
}));