2011-01-12 55 views
14

关于等待任务完成并线程同步的主题。Parallel.ForEach - 优雅取消

我目前有一个迭代,我已经包含在一个Parallel.ForEach。在下面的例子中,我在关于如何最好地处理循环的正常终止(.NET 4.0)的评论中提出了一些问题;

private void myFunction() 
    { 

     IList<string> iListOfItems = new List<string>(); 
     // populate iListOfItems 

     CancellationTokenSource cts = new CancellationTokenSource(); 

     ParallelOptions po = new ParallelOptions(); 
     po.MaxDegreeOfParallelism = 20; // max threads 
     po.CancellationToken = cts.Token; 

     try 
     { 
      var myWcfProxy = new myWcfClientSoapClient(); 

      if (Parallel.ForEach(iListOfItems, po, (item, loopsate) => 
      { 
       try 
       { 
        if (_requestedToStop) 
         loopsate.Stop(); 
        // long running blocking WS call, check before and after 
        var response = myWcfProxy.ProcessIntervalConfiguration(item); 
        if (_requestedToStop) 
         loopsate.Stop(); 

        // perform some local processing of the response object 
       } 
       catch (Exception ex) 
       { 
        // cannot continue game over. 
        if (myWcfProxy.State == CommunicationState.Faulted) 
        { 
         loopsate.Stop(); 
         throw; 
        } 
       } 

       // else carry on.. 
       // raise some events and other actions that could all risk an unhanded error. 

      } 
      ).IsCompleted) 
      { 
       RaiseAllItemsCompleteEvent(); 
      } 
     } 
     catch (Exception ex) 
     { 
      // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the 
      // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception? 
      // Do I need to call cts.Cancel here? 

      // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that? 

      // do i need to call cts.Dispose() ? 

      MessageBox.Show(Logging.FormatException(ex)); 
     } 
     finally 
     { 

      if (myWcfProxy != null) 
      { 
      // possible race condition with the for-each threads here unless we wait for them to terminate. 
       if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted) 
        myWcfProxy.Abort(); 

       myWcfProxy.Close(); 
      } 

      // possible race condition with the for-each threads here unless we wait for them to terminate. 
      _requestedToStop = false; 

     } 

    } 

任何帮助将不胜感激。 MSDN文档讲述了ManualResetEventSlim和cancellationToken.WaitHandle的。但不知道如何将它们连接到它,似乎在努力理解MSDN示例,因为大多数不适用。

回答

8

我嘲笑了下面的一些代码可能会回答你的问题。基本的一点是,你可以通过Parallel.ForEach获得fork/join并行性,所以你不需要担心并行任务之外的竞争条件(调用线程会阻​​塞,直到任务成功完成或以其他方式完成)。你只是想确保使用LoopState变量(lambda的第二个参数)来控制你的循环状态。

如果循环的任何迭代引发了未处理的异常,则整个循环将引发最后捕获的AggregateException。

提及这个话题其他链接:

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using System.ServiceModel; 

namespace Temp 
{ 
    public class Class1 
    { 
     private class MockWcfProxy 
     { 
      internal object ProcessIntervalConfiguration(string item) 
      { 
       return new Object(); 
      } 

      public CommunicationState State { get; set; } 
     } 

     private void myFunction() 
     { 

      IList<string> iListOfItems = new List<string>(); 
      // populate iListOfItems 

      CancellationTokenSource cts = new CancellationTokenSource(); 

      ParallelOptions po = new ParallelOptions(); 
      po.MaxDegreeOfParallelism = 20; // max threads 
      po.CancellationToken = cts.Token; 

      try 
      { 
       var myWcfProxy = new MockWcfProxy(); 

       if (Parallel.ForEach(iListOfItems, po, (item, loopState) => 
        { 
         try 
         { 
          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // long running blocking WS call, check before and after 
          var response = myWcfProxy.ProcessIntervalConfiguration(item); 

          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // perform some local processing of the response object 
         } 
         catch (Exception ex) 
         { 
          // cannot continue game over. 
          if (myWcfProxy.State == CommunicationState.Faulted) 
          { 
           loopState.Stop(); 
           throw; 
          } 

          // FYI you are swallowing all other exceptions here... 
         } 

         // else carry on.. 
         // raise some events and other actions that could all risk an unhanded error. 
        } 
       ).IsCompleted) 
       { 
        RaiseAllItemsCompleteEvent(); 
       } 
      } 
      catch (AggregateException aggEx) 
      { 
       // This section will be entered if any of the loops threw an unhandled exception. 
       // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
       // to see those (if you want). 
      } 
      // Execution will not get to this point until all of the iterations have completed (or one 
      // has failed, and all that were running when that failure occurred complete). 
     } 

     private void RaiseAllItemsCompleteEvent() 
     { 
      // Everything completed... 
     } 
    } 
} 
+0

感谢您的见解。我应该说,在您正确地指出“在此处吞食所有其他异常”的时候,我正在进行日志记录调用,它将记录Web服务或客户端WCF异常。如果异常不会导致无效的WCF代理,则循环的目的是继续。我预计超时错误或服务器端故障例外。对于这个特定的功能来说不需要任何缓解功能。但是,我们将审查日志文件,并调查任何此类异常。 – Terry 2011-01-13 14:12:53