我想运行一堆异步任务,并限制在任何给定时间可以等待完成的任务数量。
假设您有 1000 个 URL,并且您只想一次打开 50 个请求;但是,一旦一个请求完成,您就会打开与列表中下一个 URL 的连接。这样,每次始终打开 50 个连接,直到 URL 列表耗尽为止。
如果可能的话,我还想利用给定数量的线程。
我想出了一个扩展方法,ThrottleTasksAsync
这就是我想要的。是否已经有更简单的解决方案?我认为这是一个常见的情况。
Usage:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
这是代码:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
该方法利用BlockingCollection
and SemaphoreSlim
使其发挥作用。节流器在一个线程上运行,所有异步任务在另一线程上运行。为了实现并行性,我添加了一个 maxDegreeOfParallelism 参数,该参数传递给Parallel.ForEach
循环重新用作while
loop.
旧版本是:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
但是,线程池很快就会耗尽,你不能这样做async
/await
.
Bonus:为了解决这个问题BlockingCollection
抛出异常的地方Take()
when CompleteAdding()
被称为,我正在使用TryTake
超时过载。如果我没有使用超时TryTake
,这会违背使用的目的BlockingCollection
since TryTake
不会阻止。有没有更好的办法?理想情况下,会有一个TakeAsync
method.