Parallel.ForEach 与 BlockingCollection 集成时停止

2023-11-25

我根据中的代码采用了并行/消费者的实现这个问题

class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel, Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),
                        new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
                        (item, loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },
                _tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

这是一个测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试总是失败——它总是停留在 100 项测试中的第 93 项左右。知道我的代码的哪一部分导致了这个问题,以及如何修复它吗?


你不能使用Parallel.Foreach() with BlockingCollection.GetConsumingEnumerable(),正如您所发现的。

有关解释,请参阅此博客文章:

https://devblogs.microsoft.com/pfxteam/parallelextensionsextras-tour-4-blockingcollectionextensions/

摘自博客:

BlockingCollection 的 GetConsumingEnumerable 实现使用 BlockingCollection 的内部同步,它已经支持多个消费者并发,但 ForEach 不知道这一点,并且其可枚举分区逻辑在访问可枚举时也需要加锁。

因此,这里的同步比实际需要的要多,从而导致潜在的不可忽视的性能影响。

[此外] Parallel.ForEach 和 PLINQ 默认采用的分区算法都使用分块来最小化同步成本:它不会为每个元素获取一次锁,而是获取锁,抓取一组元素(一个块) ,然后释放锁。

虽然这种设计有助于提高整体吞吐量,但对于更注重低延迟的场景,这种分块可能会令人望而却步。

该博客还提供了名为的方法的源代码GetConsumingPartitioner()您可以用它来解决问题。

public static class BlockingCollectionExtensions
{

    public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection)
    {
        return new BlockingCollectionPartitioner<T>(collection);
    }


    public class BlockingCollectionPartitioner<T> : Partitioner<T>
    {
        private BlockingCollection<T> _collection;

        internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
        {
            if (collection == null)
                throw new ArgumentNullException("collection");

            _collection = collection;
        }

        public override bool SupportsDynamicPartitions
        {
            get { return true; }
        }

        public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
        {
            if (partitionCount < 1)
                throw new ArgumentOutOfRangeException("partitionCount");

            var dynamicPartitioner = GetDynamicPartitions();
            return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
        }

        public override IEnumerable<T> GetDynamicPartitions()
        {
            return _collection.GetConsumingEnumerable();
        }

    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Parallel.ForEach 与 BlockingCollection 集成时停止 的相关文章

随机推荐

  • 状态机、模型验证和 RSpec

    这是我当前的类定义和规范 class Event lt ActiveRecord Base state machine initial gt not started do event game started do transition n
  • 使用 Perl 模块与使用 system() 调用

    最近 我用 Perl 为 cPanel 插件编写了一些脚本 其中虽然大部分代码都是用 Perl 编写的 但也有相当多的 system 命令以及我用来直接执行 shell 命令的命令 我非常确定我可以使用 Perl 模块来代替 考虑到时间紧迫
  • 对多线程应用程序进行单元测试

    有人对多线程应用程序的单元测试有任何建议或了解任何框架吗 Do not unit测试多线程应用程序 重构代码以消除不同线程中完成的工作之间的耦合 然后分别进行测试
  • 清除 R / RStudio 中的启动屏幕

    我想更改我第一次打开 R 或实际 Rstudio 时看到的启动 登录屏幕 我想要的只是 gt 提示 仅此而已 我知道我以前在网上见过这个 但不记得搜索短语是什么 我应该补充说我正在使用乌班图Linux 有什么建议么 其他人给你建议如何停止这
  • 将选项值设置为选定的

    我想加载一个选择框 其中用户选择的值将自动出现 我正在从服务器接收带有用户信息的 Json 数据 数据样本是 color red 在我的 html 代码中 我有这样的选择选项
  • Sencha Touch MVC——通过控制器传递数据的推荐方式?

    我正在使用 Sencha Touch 作为移动应用程序 并使用其中的 MVC 功能 我非常喜欢 Sencha 但在使用控制器将数据从一个 屏幕 传递到下一个 屏幕 时遇到了一些麻烦 有一个与许多 Sencha 小部件关联的 记录 属性 例如
  • UnicodeDecodeError:“charmap”编解码器无法解码位置 XXX 中的字节 0x8f:char

    我正在尝试从 python 脚本读取一个日志文件 我的程序在 Linux 中运行良好 但在 Windows 中遇到错误 在读取特定行号的某些行后 我收到以下错误 File C Python lib encodings cp1252 py l
  • 如何在Xamarin中查找当前的UIViewController

    我正在使用Facebook 身份验证 SDK 使用 Xamarin Forms C example 然而 Facebook SDK 已经弃用了该方法 并将其替换为添加了fromViewController变量到构造函数中 我对 Xamari
  • Fluent Wait 和 WebDriver Wait - 差异

    我都看过FluentWait and WebDriverWait在使用 Selenium 的代码中 FluentWait使用轮询技术 即它将在每个固定间隔轮询特定的WebElement 我想知道有什么作用WebDriverWait做与Exp
  • python websocket 握手(RFC 6455)

    我正在尝试使用 RFC 6455 协议在 python 上实现一个简单的 websocket 服务器 我采用了握手格式here and here 我使用 Chromium 17 和 Firefox 11 作为客户端 并收到此错误 Uncau
  • Android AdMob - 请求欧洲用户同意

    我实施了欧盟用户同意政策 with Android AdMob根据本文 问题 UserMessagingPlatform总是失败onConsentInfoUpdateFailure在真实设备 带有 HashedId 或模拟器上 错误 服务器
  • Flask-SQLalchemy 更新一行信息

    如何更新行的信息 例如 我想更改 id 为 5 的行的名称列 使用检索对象Flask SQLAlchemy 文档中显示的教程 获得要更改的实体后 请更改实体本身 然后 db session commit 例如 admin User quer
  • 如何从 SQL Server 发送电子邮件?

    如何使用 T SQL 发送电子邮件 但电子邮件地址存储在表中 我想遍历表格并能够发送电子邮件 到目前为止 我找不到这样做的好例子 步骤 1 创建个人资料和帐户 您需要使用配置数据库邮件向导创建配置文件和帐户 可以从管理节点中数据库邮件节点的
  • ADB 在 Eclipse 中崩溃

    这个问题涉及到这个线程 但是该线程没有答案 因此从技术上讲这不是重复的 我已经在 Windows 7 x64 和 Eclipse SDK 3 6 2 上运行 ADB 1 0 26 并使用 ADT 10 0 1 SDK 工具 r10 并且已经
  • 如何在 Kotlin 中基于/比较多个值进行排序?

    说我有一个class Foo val a String val b Int val c Date 我想对列表进行排序Foos 基于所有三个属性 我该怎么办呢 Kotlin 的 stdlib 为此提供了许多有用的辅助方法 首先 您可以使用以下
  • 将 .html 文件转换为图像 [关闭]

    Closed 这个问题是无关 目前不接受答案 我正在寻找任何能够忠实地将 html 文件转换为图像格式的实用程序 最好是 png 或 jpeg 我在谷歌上搜索了几个小时 下载了十几个免费试用版 但似乎没有任何效果 Adobe Acrobat
  • npm install -g less 不起作用:EACCES:权限被拒绝

    我正在尝试在 phpstorm 上设置 less 以便我可以在保存时将 less 文件编译为 css 我已经安装了node js和下一步 根据这个https www jetbrains com webstorm help transpili
  • Android ListView 具有多个选择和自定义适配器

    我有一个ListView使用自定义适配器 这ListView允许多项选择 但不知何故它无法识别何时选择了某个项目 我已经使适配器项目扩展为 Checkable 但仍然getCheckedItemPositions 返回一个包含 false
  • 使用控制台让 python 在 Windows XP 上以 UTF8 打印

    我想在 Windows XP 上配置我的控制台以支持 UTF8 并让 python 检测到它并使用它 到目前为止 我的尝试 C Documents and Settings Philippe gt C Python25 python exe
  • Parallel.ForEach 与 BlockingCollection 集成时停止

    我根据中的代码采用了并行 消费者的实现这个问题 class ParallelConsumer