当生产者也是消费者时,如何在生产者/消费者模式中使用 BlockingCollection - 我该如何结束?

2024-01-11

我有一个递归问题,消费者在树的每个级别执行一些工作,然后需要递归树并在下一个级别执行相同的工作。

我想用ConcurrentBag/BlockingCollection等并行运行它。在这种情况下,队列的消费者也是队列的生产者!

我的问题是这样的:使用BlockingCollection,我可以写得很简单foreach将项目出列并将新项目排队的逻辑 - 当队列为空时,阻塞集合将正确阻塞,并等待其他使用者之一产生新工作。

但我怎么知道是否所有消费者都在阻止?!

我知道关于CompleteAdding(),但这似乎不起作用,因为真正完成的唯一时间是所有生产者完成生产并且队列为空时 - 并且由于它们都会阻塞,因此没有人“自由”设置CompleteAdding()。有没有办法检测到这一点? (也许一个事件可以在阻塞时触发,并在解除阻塞时再次触发?)

我可以手动处理这个问题,不使用foreach,但手动有一个while(!complete)循环,并使用TryTake,但随后我需要手动睡眠,这似乎效率低下(首先是阻塞收集与并发收集的全部原因!)每次循环时,如果TryTake是假的,我可以设置一个空闲标志,然后让 Master 检查队列是否为空,并且所有线程都空闲,设置一个完整标志,但同样,这看起来很混乱。

直觉告诉我有某种方法可以使用BlockingCollection做到这一点,但我无法完全做到这一点。

不管怎样,任何人都有一个关于消费者何时是生产者的良好模式,并且能够检测何时释放所有块将是很棒的。


这是一个类似于集合的低级实现BlockingCollection<T> https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1,不同之处在于它会自动完成,而不是依赖于手动调用CompleteAdding https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.completeadding方法。自动完成的条件是集合为空,并且所有消费者都处于等待状态。

/// <summary>
/// A blocking collection that completes automatically when it's empty and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private int _consumersCount = 0;
    private int _waitingConsumers = 0;
    private bool _autoCompleteStarted;
    private bool _completed;

    public int Count { get { lock (_queue) return _queue.Count; } }
    public bool IsCompleted => Volatile.Read(ref _completed);

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_completed) throw new InvalidOperationException(
                "The collection has completed.");
            _queue.Enqueue(item);
            Monitor.Pulse(_queue);
        }
    }

    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete()
    {
        lock (_queue)
        {
            if (_autoCompleteStarted) return;
            _autoCompleteStarted = true;
            Monitor.PulseAll(_queue);
        }
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        bool waiting = false;
        lock (_queue) _consumersCount++;
        try
        {
            while (true)
            {
                T item;
                lock (_queue)
                {
                    if (_completed) yield break;
                    while (_queue.Count == 0)
                    {
                        if (_autoCompleteStarted &&
                            _waitingConsumers == _consumersCount - 1)
                        {
                            _completed = true;
                            Monitor.PulseAll(_queue);
                            yield break;
                        }
                        waiting = true; _waitingConsumers++;
                        Monitor.Wait(_queue);
                        waiting = false; _waitingConsumers--;
                        if (_completed) yield break;
                    }
                    item = _queue.Dequeue();
                }
                yield return item;
            }
        }
        finally
        {
            lock (_queue)
            {
                _consumersCount--;
                if (waiting) _waitingConsumers--;
                if (!_completed && _autoCompleteStarted &&
                    _waitingConsumers == _consumersCount)
                {
                    _completed = true;
                    Monitor.PulseAll(_queue);
                }
            }
        }
    }
}

The AutoCompleteBlockingCollection<T>只提供最基本的功能BlockingCollection<T>班级。不支持有限容量和取消等功能。

使用示例:

var queue = new AutoCompleteBlockingCollection<Node>();
queue.Add(rootNode);
queue.BeginObservingAutoComplete();
Task[] workers = Enumerable.Range(1, 4).Select(_ => Task.Run(() =>
{
    foreach (Node node in queue.GetConsumingEnumerable())
    {
        Process(node);
        foreach (Node child in node.Children)
            queue.Add(child);
    }
})).ToArray();
await Task.WhenAll(workers);

The BeginObservingAutoComplete应在集合中添加初始项目后调用方法。在调用此方法之前,不检查自动完成条件。在上面的示例中,在开始观察自动完成条件之前仅添加一项。然后启动四个工作线程,每个工作线程消耗该集合,处理每个消耗的节点,然后将该节点的子节点添加到集合中。最终树的所有节点都会被消耗,最后一个活跃的worker将触发收集的自动完成。这将允许所有工作人员退出消费循环并完成。

支持随时(动态)添加和删除消费者。该集合是线程安全的。

可以找到上述集合的功能丰富但效率较低的实现here https://stackoverflow.com/questions/71708984/blockingcollection-where-the-consumers-are-also-producers/71715489#71715489.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

当生产者也是消费者时,如何在生产者/消费者模式中使用 BlockingCollection - 我该如何结束? 的相关文章

  • “构建”构建我的项目,“构建解决方案”则不构建

    我刚刚开始使用VS2010 我有一个较大的解决方案 已从 VS2008 成功迁移 我已将一个名为 Test 的控制台应用程序项目添加到解决方案中 选择构建 gt 构建解决方案不编译新项目 选择构建 gt 构建测试确实构建了项目 在失败的情况
  • WCF RIA 服务 - 加载多个实体

    我正在寻找一种模式来解决以下问题 我认为这很常见 我正在使用 WCF RIA 服务在初始加载时将多个实体返回给客户端 我希望两个实体异步加载 以免锁定 UI 并且我想利用 RIA 服务来执行此操作 我的解决方案如下 似乎有效 这种方法会遇到
  • 动态加载程序集的应用程序配置

    我正在尝试将模块动态加载到我的应用程序中 但我想为每个模块指定单独的 app config 文件 假设我的主应用程序有以下 app config 设置
  • 不支持将数据直接绑定到存储查询(DbSet、DbQuery、DbSqlQuery)

    正在编码视觉工作室2012并使用实体模型作为我的数据层 但是 当页面尝试加载时 上面提到的标题 我使用 Linq 语句的下拉控件往往会引发未处理的异常 下面是我的代码 using AdventureWorksEntities dw new
  • ASP.NET MVC:这个业务逻辑应该放在哪里?

    我正在开发我的第一个真正的 MVC 应用程序 并尝试遵循一般的 OOP 最佳实践 我正在将控制器中的一些简单业务逻辑重构到我的域模型中 我最近一直在阅读一些内容 很明显我应该将逻辑放在域模型实体类中的某个位置 以避免出现 贫血域模型 反模式
  • 嵌套接口:将 IDictionary> 转换为 IDictionary>?

    我认为投射一个相当简单IDictionary
  • 使用实体框架模型输入安全密钥

    这是我今天的完美想法 Entity Framework 中的强类型 ID 动机 比较 ModelTypeA ID 和 ModelTypeB ID 总是 至少几乎 错误 为什么编译时不处理它 如果您使用每个请求示例 DbContext 那么很
  • 用于登录 .NET 的堆栈跟踪

    我编写了一个 logger exceptionfactory 模块 它使用 System Diagnostics StackTrace 从调用方法及其声明类型中获取属性 但我注意到 如果我在 Visual Studio 之外以发布模式运行代
  • 如何从 appsettings.json 文件中的对象数组读取值

    我的 appsettings json 文件 StudentBirthdays Anne 01 11 2000 Peter 29 07 2001 Jane 15 10 2001 John Not Mentioned 我有一个单独的配置类 p
  • 堆栈溢出:堆栈空间中重复的临时分配?

    struct MemBlock char mem 1024 MemBlock operator const MemBlock b const return MemBlock global void foo int step 0 if ste
  • C#中如何移动PictureBox?

    我已经使用此代码来移动图片框pictureBox MouseMove event pictureBox Location new System Drawing Point e Location 但是当我尝试执行时 图片框闪烁并且无法识别确切
  • 使用 Bearer Token 访问 IdentityServer4 上受保护的 API

    我试图寻找此问题的解决方案 但尚未找到正确的搜索文本 我的问题是 如何配置我的 IdentityServer 以便它也可以接受 授权带有 BearerTokens 的 Api 请求 我已经配置并运行了 IdentityServer4 我还在
  • while 循环中的 scanf

    在这段代码中 scanf只工作一次 我究竟做错了什么 include
  • 垃圾收集器是否在单独的进程中运行?

    垃圾收集器是否在单独的进程中启动 例如 如果我们尝试测量某段代码所花费的进程时间 并且在此期间垃圾收集器开始收集 它会在新进程上启动还是在同一进程中启动 它的工作原理如下吗 Code Process 1 gt Garbage Collect
  • 向现有 TCP 和 UDP 代码添加 SSL 支持?

    这是我的问题 现在我有一个 Linux 服务器应用程序 使用 C gcc 编写 它与 Windows C 客户端应用程序 Visual Studio 9 Qt 4 5 进行通信 是什么very在不完全破坏现有协议的情况下向双方添加 SSL
  • 如何从两个不同的项目中获取文件夹的相对路径

    我有两个项目和一个共享库 用于从此文件夹加载图像 C MainProject Project1 Images 项目1的文件夹 C MainProject Project1 Files Bin x86 Debug 其中有project1 ex
  • 将控制台重定向到 .NET 程序中的字符串

    如何重定向写入控制台的任何内容以写入字符串 对于您自己的流程 Console SetOut http msdn microsoft com en us library system console setout aspx并将其重定向到构建在
  • 混合 ExecutionContext.SuppressFlow 和任务时 AsyncLocal.Value 出现意外值

    在应用程序中 由于 AsyncLocal 的错误 意外值 我遇到了奇怪的行为 尽管我抑制了执行上下文的流程 但 AsyncLocal Value 属性有时不会在新生成的任务的执行范围内重置 下面我创建了一个最小的可重现示例来演示该问题 pr
  • Windows 和 Linux 上的线程

    我在互联网上看到过在 Windows 上使用 C 制作多线程应用程序的教程 以及在 Linux 上执行相同操作的其他教程 但不能同时用于两者 是否存在即使在 Linux 或 Windows 上编译也能工作的函数 您需要使用一个包含两者的实现
  • C++ 中类级 new 删除运算符的线程安全

    我在我的一门课程中重新实现了新 删除运算符 现在我正在使我的代码成为多线程 并想了解这些运算符是否也需要线程安全 我在某处读到 Visual Studio 中默认的 new delete 运算符是线程安全的 但这对于我的类的自定义 new

随机推荐