Parallel.ForEach()
背后的全部想法是,你有一组线程,每个线程处理集合的一部分。正如您注意到的,这不适用于async
- await
,您希望在异步调用期间释放该线程。
您可以通过阻止ForEach()
线程来“修复”,但是这会破坏整个点async
- await
。
你可以做的是使用TPL Dataflow而不是Parallel.ForEach()
,它支持异步Task
。
具体来说,您的代码可以使用TransformBlock
来编写,它使用async
lambda将每个id转换为Customer
。该块可以配置为并行执行。您可以将该块链接到ActionBlock
,该ActionBlock
将每个Customer
写入控制台。 设置完成后,您可以将Post()
的每个ID设置为TransformBlock
。
在代码:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
虽然你可能想在TransformBlock
的并行限制一些小的常量。另外,您可以限制TransformBlock
的容量,并使用SendAsync()
异步添加项目,例如,如果该集合太大。
与您的代码相比(如果有效的话),作为一个额外的好处是只要单个项目完成就可以开始写入,而不是等到所有处理完成。
一个非常简要概述,反应性扩展,TPL和TPL数据流 - http://vantsuyoshi.wordpress.com/2012/01/05/when-to-use-tpl-async-reactive-extension-tpl-dataflow /对于像我这样可能需要一些清晰度的人。 – 2013-09-13 11:04:54
我很确定这个答案不会并行处理。我相信你需要在id上做一个Parallel.ForEach并将它们发布到getCustomerBlock。至少这是我在测试这个建议时发现的。 – JasonLind 2015-12-16 22:23:26
@JasonLind它确实如此。并行使用'Parallel.ForEach()''Post()'项目不应该有任何实际效果。 – svick 2015-12-16 22:26:02