2012-07-19 55 views
97

在metro应用程序中,我需要执行一些WCF调用。有大量的呼叫要做,所以我需要做一个并行循环。问题是并行循环在WCF调用全部完成之前退出。嵌套等待在Parallel.ForEach

你会如何重构这个按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>(); 

Parallel.ForEach(ids, async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}); 

foreach (var customer in customers) 
{ 
    Console.WriteLine(customer.ID); 
} 

Console.ReadKey(); 

回答

101

Parallel.ForEach()背后的全部想法是,你有一组线程,每个线程处理集合的一部分。正如您注意到的,这不适用于async - await,您希望在异步调用期间释放该线程。

您可以通过阻止ForEach()线程来“修复”,但是这会破坏整个点async - await

你可以做的是使用TPL Dataflow而不是Parallel.ForEach(),它支持异步Task

具体来说,您的代码可以使用TransformBlock来编写,它使用async lambda将每个id转换为Customer。该块可以配置为并行执行。您可以将该块链接到ActionBlock,该ActionBlock将每个Customer写入控制台。 设置完成后,您可以将Post()的每个ID设置为TransformBlock

在代码:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i => 
    { 
     ICustomerRepo repo = new CustomerRepo(); 
     return await repo.GetCustomer(i); 
    }, new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
    }); 
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID)); 
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions 
    { 
     PropagateCompletion = true 
    }); 

foreach (var id in ids) 
    getCustomerBlock.Post(id); 

getCustomerBlock.Complete(); 
writeCustomerBlock.Completion.Wait(); 

虽然你可能想在TransformBlock的并行限制一些小的常量。另外,您可以限制TransformBlock的容量,并使用SendAsync()异步添加项目,例如,如果该集合太大。

与您的代码相比(如果有效的话),作为一个额外的好处是只要单个项目完成就可以开始写入,而不是等到所有处理完成。

+1

一个非常简要概述,反应性扩展,TPL和TPL数据流 - http://vantsuyoshi.wordpress.com/2012/01/05/when-to-use-tpl-async-reactive-extension-tpl-dataflow /对于像我这样可能需要一些清晰度的人。 – 2013-09-13 11:04:54

+1

我很确定这个答案不会并行处理。我相信你需要在id上做一个Parallel.ForEach并将它们发布到getCustomerBlock。至少这是我在测试这个建议时发现的。 – JasonLind 2015-12-16 22:23:26

+2

@JasonLind它确实如此。并行使用'Parallel.ForEach()''Post()'项目不应该有任何实际效果。 – svick 2015-12-16 22:26:02

79

svick's answer是(像往常一样)优秀。

但是,如果实际上有大量数据要传输,我发现Dataflow更有用。或者当您需要一个兼容async的队列时。

在你的情况,一个简单的解决方法就是使用async风格的并行:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 

var customerTasks = ids.Select(i => 
    { 
    ICustomerRepo repo = new CustomerRepo(); 
    return repo.GetCustomer(i); 
    }); 
var customers = await Task.WhenAll(customerTasks); 

foreach (var customer in customers) 
{ 
    Console.WriteLine(customer.ID); 
} 

Console.ReadKey(); 
+7

如果你想手动限制并行(在这种情况下你很可能会这样做),这样做会更复杂。 – svick 2012-07-19 16:50:56

+1

好点。数据流具有很好的旋钮。 – 2012-07-19 16:51:37

+0

但是你说得对,Dataflow可能非常复杂(例如,与'Parallel.ForEach()'相比)。但我认为它是目前最好的选择,可以对集合进行几乎所有的“异步”工作。 – svick 2012-07-19 16:53:10

47

使用数据流为svick建议可能是矫枉过正,和斯蒂芬的回答并没有提供手段来控制并发操作。然而,可以相当简单地实现:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
    int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory) 
{ 
    var activeTasks = new List<Task>(maxDegreeOfConcurrency); 
    foreach (var task in collection.Select(taskFactory)) 
    { 
     activeTasks.Add(task); 
     if (activeTasks.Count == maxDegreeOfConcurrency) 
     { 
      await Task.WhenAny(activeTasks.ToArray()); 
      //observe exceptions here 
      activeTasks.RemoveAll(t => t.IsCompleted); 
     } 
    } 
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    { 
     //observe exceptions in a manner consistent with the above 
    }); 
} 

ToArray()调用可以通过优化使用数组而不是列表和更换完成的任务,但我怀疑这会挣很多在大多数情况下的差别。每OP的问题示例用法:

RunWithMaxDegreeOfConcurrency(10, ids, async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}); 

编辑研究员SO用户和TPL奇才Eli Arbel向我指出一个related article from Stephen Toub。像往常一样,他的实现既优雅又高效:

public static Task ForEachAsync<T>(
     this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current).ContinueWith(t => 
          { 
           //observe exceptions 
          }); 

     })); 
} 
+0

爱Eli阿贝尔选项。有几个后续问题:我很想跟踪进展情况。我在方法中添加了'ref int done',然后'ContinueWith done ++'但是“不能在匿名方法,lambda表达式或查询表达式中使用ref或out参数......任何想法如何跟踪进度? – Stefanvds 2015-07-30 07:29:48

+0

没关系,我可以在foreachasync代码中坚持完成++ – Stefanvds 2015-07-30 07:43:38

+0

我发现ForEachAsync代码不能按预期工作,至少,并非总是出于某种原因(我目前无法解释发生了什么)。使用'dop = 5'在调用代码时我会得到不同的结果(我应该始终得到相同的结果 - 数据不会改变)!小心! – Shaamaan 2015-11-13 15:04:34

7

包裹Parallel.Foreach成Task.Run()和而不是等待关键字使用[yourasyncmethod]。结果

(你需要做Task.Run事情不会阻止UI线程)

事情是这样的:

var yourForeachTask = Task.Run(() => 
     { 
      Parallel.ForEach(ids, i => 
      { 
       ICustomerRepo repo = new CustomerRepo(); 
       var cust = repo.GetCustomer(i).Result; 
       customers.Add(cust); 
      }); 
     }); 
await yourForeachTask; 
+3

这是什么问题?我已经完成了它,让'Parallel.ForEach'完成并行工作,直到所有的都完成了,然后把整个事情推到一个后台线程来获得一个响应式的用户界面,这有什么问题吗?也许这是一个睡眠线程太多,但它是短的,可读的代码 – ygoe 2015-06-17 18:22:41

+0

@LonelyPixel我唯一的问题是它调用'Task.Run' w母鸡'TaskCompletionSource'是可取的。 – Gusdor 2016-03-30 13:31:43

+1

@Gusdor好奇 - 为什么'TaskCompletionSource'更可取? – Seafish 2016-07-13 14:34:08

6

这应该是比让整个TPL数据流的工作相当有效,也更容易荷兰国际集团:

var customers = await ids.SelectAsync(async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    return await repo.GetCustomer(i); 
}); 

... 

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4) 
{ 
    var results = new List<TResult>(); 

    var activeTasks = new HashSet<Task<TResult>>(); 
    foreach (var item in source) 
    { 
     activeTasks.Add(selector(item)); 
     if (activeTasks.Count >= maxDegreesOfParallelism) 
     { 
      var completed = await Task.WhenAny(activeTasks); 
      activeTasks.Remove(completed); 
      results.Add(completed.Result); 
     } 
    } 

    results.AddRange(await Task.WhenAll(activeTasks)); 
    return results; 
} 
+0

不应该使用'await'如下:'var customers = await ids.SelectAsync(async i => {...});'? – Paccc 2014-12-14 04:02:25

+0

@pacc:你说的没错。固定。 – 2014-12-14 04:20:34

1

我有点晚了党,但你可能要考虑使用GetAwaiter.GetResult()运行在同步方面的异步代码,但下面平行中;

Parallel.ForEach(ids, i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    // Run this in thread which Parallel library occupied. 
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); 
    customers.Add(cust); 
}); 
7

您可以使用新的AsyncEnumerator NuGet Package节省工作量,这在4年前当问题最初发布时并不存在。它可以让你控制并行度:

using System.Collections.Async; 
... 

await ids.ParallelForEachAsync(async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}, 
maxDegreeOfParallelism: 10); 

免责声明:我是AsyncEnumerator库,它是开源的,MIT许可下的作者,和我张贴此消息只是为了帮助社区。

+4

谢尔盖,你应该透露你是图书馆 – 2017-12-16 22:55:04

+2

好的作者,补充了免责声明。我不想从广告中获得任何好处,只想帮助别人;) – 2018-02-24 18:01:23

+0

非常有用的图书馆,希望很快就会被纳入Core – rekiem87 2018-02-27 23:09:20

1

引入了一堆的辅助方法后,您将能够运行并行查询这个简单的sintax:

const int DegreeOfParallelism = 10; 
IEnumerable<double> result = await Enumerable.Range(0, 1000000) 
    .Split(DegreeOfParallelism) 
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) 
    .ConfigureAwait(false); 

这里发生的是我们分裂源收集到10块(.Split(DegreeOfParallelism)),然后运行10个任务每个处理它的项目一个接一个(.SelectManyAsync(...)),并将它们合并到一个列表中。

值得一提的是有个简单的方法:

double[] result2 = await Enumerable.Range(0, 1000000) 
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) 
    .WhenAll() 
    .ConfigureAwait(false); 

但它需要一个预防:如果你有一个源的集合,是太大了,它会chedule为每个项目一个Task向右走,这可能会导致显着的性能点击。在上述外表实施例中使用

扩展方法如下:异步的

public static class CollectionExtensions 
{ 
    /// <summary> 
    /// Splits collection into number of collections of nearly equal size. 
    /// </summary> 
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount) 
    { 
     if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); 

     List<T> source = src.ToList(); 
     var sourceIndex = 0; 
     for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) 
     { 
      var list = new List<T>(); 
      int itemsLeft = source.Count - targetIndex; 
      while (slicesCount * list.Count < itemsLeft) 
      { 
       list.Add(source[sourceIndex++]); 
      } 

      yield return list; 
     } 
    } 

    /// <summary> 
    /// Takes collection of collections, projects those in parallel and merges results. 
    /// </summary> 
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
     this IEnumerable<IEnumerable<T>> source, 
     Func<T, Task<TResult>> func) 
    { 
     List<TResult>[] slices = await source 
      .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) 
      .WhenAll() 
      .ConfigureAwait(false); 
     return slices.SelectMany(s => s); 
    } 

    /// <summary>Runs selector and awaits results.</summary> 
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) 
    { 
     List<TResult> result = new List<TResult>(); 
     foreach (TSource source1 in source) 
     { 
      TResult result1 = await selector(source1).ConfigureAwait(false); 
      result.Add(result1); 
     } 
     return result; 
    } 

    /// <summary>Wraps tasks with Task.WhenAll.</summary> 
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source) 
    { 
     return Task.WhenAll<TResult>(source); 
    } 
}