2010-11-01 72 views
4

我需要在同一时间异步运行一组繁重的函数,并将结果填充到列表中。下面是这个伪代码:我需要foreach博克内部的逻辑如何并行运行一组函数并在完成后等待结果?

List<TResult> results = new List<TResults>(); 
List<Func<T, TResult>> tasks = PopulateTasks(); 

foreach(var task in tasks) 
{ 
    // Run Logic in question 
    1. Run each task asynchronously/parallely 
    2. Put the results in the results list upon each task completion 
} 

Console.WriteLine("All tasks completed and results populated"); 

。你们可以帮我吗?

我有一些约束:该解决方案必须在.NET 3.5兼容的(不是.NET 4,但.NET 4替代解决方案将不胜感激我的知识的目的)提前

感谢。

+0

另见http://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach(我的回答是http://stackoverflow.com/a/25877042/67824) – 2014-10-14 12:43:57

回答

4

一个简单的3.5执行可能看起来像这样

List<TResult> results = new List<TResults>(); 
List<Func<T, TResult>> tasks = PopulateTasks(); 

ManualResetEvent waitHandle = new ManualResetEvent(false); 
void RunTasks() 
{ 
    int i = 0; 
    foreach(var task in tasks) 
    { 
     int captured = i++; 
     ThreadPool.QueueUserWorkItem(state => RunTask(task, captured)) 
    } 

    waitHandle.WaitOne(); 

    Console.WriteLine("All tasks completed and results populated"); 
} 

private int counter; 
private readonly object listLock = new object(); 
void RunTask(Func<T, TResult> task, int index) 
{ 
    var res = task(...); //You haven't specified where the parameter comes from 
    lock (listLock) 
    { 
     results[index] = res; 
    } 
    if (InterLocked.Increment(ref counter) == tasks.Count) 
     waitHandle.Set(); 
} 
4
List<Func<T, TResult>> tasks = PopulateTasks(); 
TResult[] results = new TResult[tasks.Length]; 
Parallel.For(0, tasks.Count, i => 
    { 
     results[i] = tasks[i](); 
    }); 

TPL for 3.5 apparently exists

0

传统的方法是使用Sempahore。使用您正在使用的线程数初始化信号量,然后启动每个任务的线程并等待信号量对象。当每个线程完成时,它应该增加信号量。当信号计数达到0时,正在等待的主线程将继续。

1
public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks) 
    { 
     List<IAsyncResult> asyncContext = new List<IAsyncResult>(); 
     foreach (var task in tasks) 
     { 
      asyncContext.Add(task.BeginInvoke(null, null)); 
     } 
     return asyncContext; 
    } 

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext) 
    { 
     IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator(); 
     foreach (var task in tasks) 
     { 
      iterator.MoveNext(); 
      yield return task.EndInvoke(iterator.Current); 
     } 
    } 

    public static void Main() 
    { 
     var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList(); 

     var asyncContext = RunAsync(tasks); 
     var results = WaitForAll(tasks, asyncContext); 
     foreach (var result in results) 
     { 
      Console.WriteLine(result); 
     } 
    } 

    public static int ComputeValue() 
    { 
     Thread.Sleep(1000); 
     return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    } 
1

另一个变化是用小将来的模式实现:

public class Future<T> 
    { 
     public Future(Func<T> task) 
     { 
      Task = task; 
      _asyncContext = task.BeginInvoke(null, null); 
     } 

     private IAsyncResult _asyncContext; 

     public Func<T> Task { get; private set; } 
     public T Result 
     { 
      get 
      { 
       return Task.EndInvoke(_asyncContext); 
      } 
     } 

     public bool IsCompleted 
     { 
      get { return _asyncContext.IsCompleted; } 
     } 
    } 

    public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks) 
    { 
     List<Future<T>> asyncContext = new List<Future<T>>(); 
     foreach (var task in tasks) 
     { 
      asyncContext.Add(new Future<T>(task)); 
     } 
     return asyncContext; 
    } 

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures) 
    { 
     foreach (var future in futures) 
     { 
      yield return future.Result; 
     } 
    } 

    public static void Main() 
    { 
     var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList(); 

     var futures = RunAsync(tasks); 
     var results = WaitForAll(futures); 
     foreach (var result in results) 
     { 
      Console.WriteLine(result); 
     } 
    } 

    public static int ComputeValue() 
    { 
     Thread.Sleep(1000); 
     return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    } 
0

做你的处理在单独的工作实例,每个实例上他们自己的线程。使用回调来传回结果并向调用进程发出线程已完成的信号。使用字典来跟踪正在运行的线程。如果你有很多线程,你应该加载一个队列,并在旧线程完成时启动新线程。在本例中,所有线程都是在启动任何线程之前创建的,以防止在最终线程启动之前运行线程数降至零的竞态条件。

Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>(); 
    void LaunchWorkers() 
    { 
     foreach (var task in tasks) 
     { 
      Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult)); 
      Thread thread = new Thread(worker.Done); 
      thread.IsBackground = true; 
      activeThreads.Add(thread.ManagedThreadId, thread); 
     } 
     lock (activeThreads) 
     { 
      activeThreads.Values.ToList().ForEach(n => n.Start()); 
     } 
    } 

    void ProcessResult(int threadId, TResult result) 
    { 
     lock (results) 
     { 
      results.Add(result); 
     } 
     lock (activeThreads) 
     { 
      activeThreads.Remove(threadId); 
      // done when activeThreads.Count == 0 
     } 
    } 
} 

public delegate void WorkerDoneDelegate(object results); 
class Worker 
{ 
    public WorkerDoneDelegate Done; 
    public void Work(Task task, WorkerDoneDelegate Done) 
    { 
     // process task 
     Done(Thread.CurrentThread.ManagedThreadId, result); 
    } 
} 
相关问题