好问题!
首先,我不确定LimitedConcurrencyLevelTaskScheduler
是学术上正确的解决方案。为了将并发请求数限制为 N,您必须阻止 N 个任务,这从一开始就违背了使用 APM 异步调用的目的。
话虽如此,它比替代方案更容易实施。您需要有一个工作队列并记录正在进行的请求数量,然后根据需要创建工作任务。要做到这一点并不简单,如果并发请求的数量 N 很小,那么拥有 N 个阻塞线程并不是世界末日。
因此,您的代码的问题在于,在其他任务中创建的任务使用父任务中的调度程序。实际上,对于使用以下命令创建的任务来说并非如此FromAsync
因为它们使用底层 APM 实现,所以有点不同。
您在中创建任务Main
with:
return factory.StartNew( () =>
{
var task = getTask();
task.Wait();
}
);
factory
使用LimitedConcurrencyLevelTaskScheduler( 1 )
,因此这些任务中只有 1 个可以并发执行,并且其中一个正在等待从getTask()
.
So, in GetReadTask
你打电话Task<int>.Factory.FromAsync
。这是因为FromAsync
不尊重父任务的调度程序。
然后你创建一个延续.ContinueWith(task => fileStream.Close())
。这将创建一个尊重其父级调度程序的任务。自从LimitedConcurrencyLevelTaskScheduler
已经在执行一项任务(Main
被阻止了)延续无法运行并且出现死锁。
解决方案是在普通线程池线程上运行延续TaskScheduler.Default
。然后它开始运行,僵局被打破。
这是我的解决方案:
static Task QueueReadTask( TaskScheduler ts, int number )
{
Output.Write( "QueueReadTask( " + number + " )" );
return Task.Factory.StartNew( () =>
{
Output.Write( "Opening file " + number + "." );
FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
byte[] buffer = new byte[ 32 ];
var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
var tClose = tRead.ContinueWith( task =>
{
Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
fileStream.Close();
}
, TaskScheduler.Default
);
tClose.Wait();
}
, CancellationToken.None
, TaskCreationOptions.None
, ts
);
}
And Main
现在看起来像这样:
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
int[] range = { 1, 2, 3 };
var tasks = range.Select( number =>
{
var task = QueueReadTask( ts, number );
return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
}
)
.ToArray();
Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
Task.WaitAll( tasks );
Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}
有几点需要注意:
移动task.Wait()
into QueueReadTask
更明显地表明您正在阻止任务。您可以删除FromAsync
调用和延续,并将它们替换为正常的同步调用,因为无论如何你都会阻塞。
任务返回自QueueReadTask
可以有延续。默认情况下,它们在默认调度程序下运行,因为它们继承父任务的调度程序而不是前件任务的调度程序。在这种情况下,没有父任务,因此使用默认调度程序。