你可以使用TransformManyBlock<string, int> https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformmanyblock-2作为生产者区块,以及ActionBlock<int>
作为消费者块。这TransformManyBlock
将使用接受的构造函数进行实例化Func<string, IEnumerable<int>>
代表,并通过了迭代器方法 https://learn.microsoft.com/en-us/dotnet/csharp/iterators (the Produce
方法(下例中的方法)会一一生成值:
Random random = new Random();
var producer = new TransformManyBlock<string, int>(Produce);
IEnumerable<int> Produce(string message)
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
yield return value;
Thread.Sleep(1500);
cnt++;
}
}
else
{
yield break;
}
}
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
producer.LinkTo(consumer, new() { PropagateCompletion = true });
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
不幸的是,生产者必须在产生每个值之间的空闲期间阻塞工作线程(Thread.Sleep(1500);
),因为TransformManyBlock
目前没有接受的构造函数Func<string, IAsyncEnumerable<int>>
。这可能会在下一版本中修复TPL数据流 https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library图书馆。你可以追踪this https://github.com/dotnet/runtime/issues/30863GitHub 问题,了解此功能何时发布。
替代解决方案:您可以保持它们不链接,然后手动将生产者生成的值发送给消费者,而不是显式链接生产者和消费者。在这种情况下,两个块都是ActionBlock https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1s:
Random random = new Random();
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
var producer = new ActionBlock<string>(async message =>
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
var accepted = await consumer.SendAsync(value);
if (!accepted) break; // The consumer has failed
await Task.Delay(1500);
cnt++;
}
}
});
PropagateCompletion(producer, consumer);
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}
这种方法的主要困难是如何将生产者的完成情况传播给消费者,以便最终两个块都完成。显然你不能使用new DataflowLinkOptions { PropagateCompletion = true }
配置,因为块没有显式链接。你也不能Complete
手动消费者,因为在这种情况下,它会过早地停止接受来自生产者的值。这个问题的解决方案是PropagateCompletion
方法如上例所示。