2014-10-28 71 views
13

我有这样的代码:如何并行运行LINQ'let'语句?

var list = new List<int> {1, 2, 3, 4, 5}; 

var result = from x in list.AsParallel() 
      let a = LongRunningCalc1(x) 
      let b = LongRunningCalc2(x) 
      select new {a, b}; 

比方说,LongRunningCalc方法各服1次。上面的代码需要大约2秒的时间才能运行,因为当并行操作5个元素的列表时,从let语句调用的两个方法被顺序调用。

但是,这些方法也可以安全地并行调用。他们显然需要合并回select,但在此之前应该并行运行 - select应该等待它们。

有没有办法做到这一点?

回答

7

您将无法使用查询语法或let操作,但你可以写并行执行的每个项目多个操作的方法:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<TResult1, TResult2, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     return resultAggregator(result1.Result, result2.Result); 
    }); 
} 

这将使你写:

var query = list.AsParallel() 
    .SelectAll(LongRunningCalc1, 
     LongRunningCalc2, 
     (a, b) => new {a, b}) 

你可以额外的并行操作添加过载以及:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal> 
    (this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<T, TResult3> selector3, 
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     var result3 = Task.Run(() => selector3(item)); 
     return resultAggregator(
      result1.Result, 
      result2.Result, 
      result3.Result); 
    }); 
} 

它可以写一个版本来处理一些在编译的时候不知道选择的,但要做到这一点,他们都需要计算出相同类型的值:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    IEnumerable<Func<T, TResult>> selectors) 
{ 
    return query.Select(item => selectors.AsParallel() 
      .Select(selector => selector(item)) 
      .AsEnumerable()); 
} 
public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    params Func<T, TResult>[] selectors) 
{ 
    return SelectAll(query, selectors); 
} 
+0

这不是我的专家领域,但你不需要等待任务的结果? – Magnus 2014-10-28 17:56:13

+0

@Magnus我已经是。 – Servy 2014-10-28 17:58:15

+0

Yeh调用'.Result'等待任务执行。我简单地简化了我的示例代码,但这使我走上了正确的道路,感谢您的帮助。 – Lyall 2014-10-28 18:01:53

0

我会做到这一点使用微软的反应框架(NuGet中的“Rx-Main”)。

这就是:

var result = 
    from x in list.ToObservable() 
    from a in Observable.Start(() => LongRunningCalc1(x)) 
    from b in Observable.Start(() => LongRunningCalc2(x)) 
    select new {a, b}; 

的好处是,你可以使用.Subscribe(...)方法访问结果,因为它们产生:

result.Subscribe(x => /* Do something with x.a and/or x.b */); 

超级简单!