限制异步任务

2024-03-31

我想运行一堆异步任务,并限制在任何给定时间可以等待完成的任务数量。

假设您有 1000 个 URL,并且您只想一次打开 50 个请求;但是,一旦一个请求完成,您就会打开与列表中下一个 URL 的连接。这样,每次始终打开 50 个连接,直到 URL 列表耗尽为止。

如果可能的话,我还想利用给定数量的线程。

我想出了一个扩展方法,ThrottleTasksAsync这就是我想要的。是否已经有更简单的解决方案?我认为这是一个常见的情况。

Usage:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

这是代码:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

该方法利用BlockingCollection and SemaphoreSlim使其发挥作用。节流器在一个线程上运行,所有异步任务在另一线程上运行。为了实现并行性,我添加了一个 maxDegreeOfParallelism 参数,该参数传递给Parallel.ForEach循环重新用作while loop.

旧版本是:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

但是,线程池很快就会耗尽,你不能这样做async/await.

Bonus:为了解决这个问题BlockingCollection抛出异常的地方Take() when CompleteAdding()被称为,我正在使用TryTake超时过载。如果我没有使用超时TryTake,这会违背使用的目的BlockingCollection since TryTake不会阻止。有没有更好的办法?理想情况下,会有一个TakeAsync method.


按照建议,使用TPL数据流 https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library.

A TransformBlock<TInput, TOutput> https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformblock-2可能就是您正在寻找的。

你定义一个MaxDegreeOfParallelism限制可以并行转换的字符串数量(即可以下载多少个 url)。然后,您将 URL 发布到该块,完成后,您告诉该块您已完成添加项目并获取响应。

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

注:TransformBlock缓冲其输入和输出。那么,为什么我们需要将其链接到BufferBlock?

因为TransformBlock直到所有项目(HttpResponse)已被消耗,并且await downloader.Completion会挂起。相反,我们让downloader将其所有输出转发到专用缓冲区块 - 然后我们等待downloader完成并检查缓冲区块。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

限制异步任务 的相关文章

随机推荐

  • 向量(插入然后排序)或集合哪种方法更快?

    我有数字序列 未排序 无重复 目标是对它们进行排序 方法一 插入向量 O n 使用排序算法并排序 O nlogn 方法2 插入集合 o nlogn 哪种方法会更快 我觉得设置会更快 因为向量中的每个插入都必须分配 完整的数组元素并复制它然后
  • 通过从客户区的一部分拖动无框窗口来移动它

    正如标题所示 我想仅当用户将窗口从客户区域的一部分拖动时才移动窗口 这将是对正常标题栏移动的模仿 这是因为我的表单是自定义的 并且没有任何标题或标题栏 目前 我使用的代码如下 case WM NCHITTEST return HTCAPTI
  • 你能用 Java 获得基本的 GC 统计数据吗?

    我想让一些长时间运行的服务器应用程序定期输出 Java 中的一般 GC 性能数据 例如 Runtime freeMemory 的 GC 等价物 例如完成的周期数 平均时间等 我们的系统在客户机器上运行 怀疑配置错误的内存池导致了过多的 GC
  • 仅旋转特定 Excel 行中的文本

    我想使用以下命令旋转 Excel 文件中的标题Microsoft Office Interop 为了实现这一目标 我使用以下代码 worksheet Range A1 worksheet UsedRange Columns Count 1
  • HTTP 标头中什么被视为空白

    我刚刚阅读了 HTTP 标准 拟议标准更准确地说 第 1 部分 并对第 3 节倒数第二段中他们认为的 空白 感到困惑 https www rfc editor org rfc rfc7230 section 3 https www rfc
  • 在 Visual Studio 2008 for .NET CF 中处理不同分辨率

    我正在开发一个基于 NET CF 的图形应用程序 我的项目涉及大量绘图图像 我们决定在不同的手机分辨率上移植该应用程序 240 X 240 480 X 640 等 我将如何在单个解决方案 项目中实现这一目标 是否需要根据决议创建不同的项目
  • R 错误:java.lang.OutOfMemoryError:Java 堆空间

    我正在尝试将 R 连接到 Teradata 以将数据直接提取到 R 中进行分析 但是 我收到错误 Error in jcall rp I fetch stride block java lang OutOfMemoryError Java
  • SIP:错误数据连接丢失

    我已经在 android 中使用本机 sip 创建了 sip 应用程序 在其中 我在从 sip 服务器取消注册帐户时遇到问题 每次我得到数据连接丢失我也在android文档中看到 但没有对此错误的简短解释 而且它在注册时面临各种错误 如in
  • AutoMapper 排除字段

    我正在尝试将一个对象映射到另一个对象 但该对象非常复杂 在开发过程中 我希望能够排除一堆字段并逐一访问它们 或者能够指定仅映射我想要的字段 并在每次测试成功时增加字段 So class string field1 string field2
  • 使用 apache 2.4 设置 git-http-backend

    我试图使用 git http backend 和 apache 2 4 设置一个 git 服务器 我发现这个问题 https stackoverflow com questions 26734933 how to set up git ov
  • 如何制作多表头

    I am trying to make a table with 2 headers merged At the moment i made 2 seperate tables with 2 seperate headers and it
  • 用 ssh 替换 telnet

    我有一些程序使用 Net Telnet 模块连接到多个服务器 现在管理员决定将 Telnet 服务替换为 SSH 保留其他所有内容 例如用户帐户 我查看了 Net SSH2 发现我必须更改大部分程序 您是否知道其他更适合相同替代品的 SSH
  • 将 FrameworkElement 及其 DataContext 保存到图像文件未成功

    我有一个名为 UserControl1 的简单 UserControl 其中包含一个 TextBlock
  • 使用正则表达式从字符串中删除日期

    好的 我有一个字符串 title string 它可能类似于以下任何一个 title string 20 08 12 First Test Event title string First Test event 20 08 12 title
  • 为什么 Python 中字典中的项目顺序会改变? [复制]

    这个问题在这里已经有答案了 我正在尝试从一些教程中学习Python 这是我遇到的一个让我困惑的简单例子 gt gt gt d server mpilgrim database master uid sa pwd secret gt gt g
  • Web Worker 和 Canvas 数据

    我看过很多关于网络工作者的帖子
  • 带按钮控件的 DataGridView - 删除行

    我想要在每行的末尾有一个删除按钮DataGridView通过单击我想从绑定列表中删除所需的行 该绑定列表是我的网格的数据源 但我似乎无法做到这一点 我在产品类中创建了一个按钮对象 并使用唯一的 id 实例化它以从列表中删除该对象 但按钮未显
  • 如何桥接 JavaScript(参差不齐)数组和 std::vector> 对象?

    在 JavaScript 中 我有一个 线 列表 每条线都由不定数量的 点 组成 每个点都有以下形式 x y 所以它是一个 3D 参差不齐的数组 现在我需要在 emscripten 的帮助下将它传递给我的 C 代码 embind https
  • Nuget Push 总是返回 404(未找到)

    我尝试将 nuget 包发布到我的 GitHub Packages 帐户 但在所有情况下我都会遇到 404 错误 我已按照 GitHub 网站上的要求进行操作 nuget source Add Name GitHub Source http
  • 限制异步任务

    我想运行一堆异步任务 并限制在任何给定时间可以等待完成的任务数量 假设您有 1000 个 URL 并且您只想一次打开 50 个请求 但是 一旦一个请求完成 您就会打开与列表中下一个 URL 的连接 这样 每次始终打开 50 个连接 直到 U