这是一个类似于集合的低级实现BlockingCollection<T> https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1,不同之处在于它会自动完成,而不是依赖于手动调用CompleteAdding https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.completeadding方法。自动完成的条件是集合为空,并且所有消费者都处于等待状态。
/// <summary>
/// A blocking collection that completes automatically when it's empty and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private int _consumersCount = 0;
private int _waitingConsumers = 0;
private bool _autoCompleteStarted;
private bool _completed;
public int Count { get { lock (_queue) return _queue.Count; } }
public bool IsCompleted => Volatile.Read(ref _completed);
public void Add(T item)
{
lock (_queue)
{
if (_completed) throw new InvalidOperationException(
"The collection has completed.");
_queue.Enqueue(item);
Monitor.Pulse(_queue);
}
}
/// <summary>
/// Begin observing the condition for automatic completion.
/// </summary>
public void BeginObservingAutoComplete()
{
lock (_queue)
{
if (_autoCompleteStarted) return;
_autoCompleteStarted = true;
Monitor.PulseAll(_queue);
}
}
public IEnumerable<T> GetConsumingEnumerable()
{
bool waiting = false;
lock (_queue) _consumersCount++;
try
{
while (true)
{
T item;
lock (_queue)
{
if (_completed) yield break;
while (_queue.Count == 0)
{
if (_autoCompleteStarted &&
_waitingConsumers == _consumersCount - 1)
{
_completed = true;
Monitor.PulseAll(_queue);
yield break;
}
waiting = true; _waitingConsumers++;
Monitor.Wait(_queue);
waiting = false; _waitingConsumers--;
if (_completed) yield break;
}
item = _queue.Dequeue();
}
yield return item;
}
}
finally
{
lock (_queue)
{
_consumersCount--;
if (waiting) _waitingConsumers--;
if (!_completed && _autoCompleteStarted &&
_waitingConsumers == _consumersCount)
{
_completed = true;
Monitor.PulseAll(_queue);
}
}
}
}
}
The AutoCompleteBlockingCollection<T>
只提供最基本的功能BlockingCollection<T>
班级。不支持有限容量和取消等功能。
使用示例:
var queue = new AutoCompleteBlockingCollection<Node>();
queue.Add(rootNode);
queue.BeginObservingAutoComplete();
Task[] workers = Enumerable.Range(1, 4).Select(_ => Task.Run(() =>
{
foreach (Node node in queue.GetConsumingEnumerable())
{
Process(node);
foreach (Node child in node.Children)
queue.Add(child);
}
})).ToArray();
await Task.WhenAll(workers);
The BeginObservingAutoComplete
应在集合中添加初始项目后调用方法。在调用此方法之前,不检查自动完成条件。在上面的示例中,在开始观察自动完成条件之前仅添加一项。然后启动四个工作线程,每个工作线程消耗该集合,处理每个消耗的节点,然后将该节点的子节点添加到集合中。最终树的所有节点都会被消耗,最后一个活跃的worker将触发收集的自动完成。这将允许所有工作人员退出消费循环并完成。
支持随时(动态)添加和删除消费者。该集合是线程安全的。
可以找到上述集合的功能丰富但效率较低的实现here https://stackoverflow.com/questions/71708984/blockingcollection-where-the-consumers-are-also-producers/71715489#71715489.