2009-08-19 67 views
0

我不知道标题是否有意义,但在我写的应用程序中有很多(扩展)方法。一个简单的例子:自动执行循环

对象:

Matter (Burn, Explode, Destroy, Freeze, Heat, Cool) 
Atom (Attach, Detach) 
<many more> 

和一个自定义集合,如:

ImmutableList<T> 

和方法,像这样:

public static class Burner 
{ 
    public static Matter Burn (this Matter matter) 
    { 
     // matter is burning ... 
    } 
} 

var matters = new ImmutableList<Matter>(); 
matters.Burn(); 

正如你可以看到,刻录作品上一个单一的实体,但仍然出现在ImmutableList上。这样我就可以自己管理并行处理(并行)。

我该如何做到最高效,最干净,最可维护的方式,还是结合在一起?

首先,我宁愿不定义另一个需要每个类(燃烧器等)内的ImmutableList的扩展方法,因为有成百上千的像这些,他们可能会看起来相同。但我对创意持开放态度。

此外,所有代码都是我的,所以我可以在代码的任何部分更改/添加任何内容,而不仅仅是扩展方法。

回答

2

这有什么错

matters.ForEach(Burner.Burn); 

与自己实现ForEach

+0

感谢但ForEach并不平行,我想管理自己的并行化以满足应用程序的特定需求。如果可能的话,我宁愿在我的文章中的语法。 – 2009-08-19 18:38:39

+0

也许我没有正确理解你的问题,但是什么阻止你实现自己的'ForEach'方法? – dtb 2009-08-19 18:40:29

+0

我可以,但我想有一个像这样的语法:matters.Burn();否则代码中会出现很多ForEach调用,并不是说这是一件坏事,但它很明显,我想要一个“直接”调用,因此也就是扩展方法。 – 2009-08-19 18:42:01

0

创建自己的ForEachParallel扩展,然后,如果你不想使用PLINQ或东西

1

下面是一个简单的类,以并行的方式进行迭代。

埃姆雷Aydinceren

用法:

Parallel.ForEach(事项,物质=> matter.Burn());

matters.ParallelForEach(物质=> matter.Burn());

/// <summary> 
/// Provides concurrent processing on a sequence of elements 
/// </summary> 
public static class Parallel 
{ 
    /// <summary> 
    /// Number Of parallel tasks 
    /// </summary> 
    public static int NumberOfParallelTasks; 


    static Parallel() 
    { 
     NumberOfParallelTasks = Environment.ProcessorCount < 65 ? Environment.ProcessorCount : 64; 
    } 

    /// <summary> 
    /// Performs the specified action on each element of the sequence in seperate threads. 
    /// </summary> 
    /// <typeparam name="T">The type of the elements of source.</typeparam> 
    /// <param name="source">A sequence that contains elements to perform action</param> 
    /// <param name="action">The Action delegate to perform on each element of the IEnumerable.</param> 
    public static void ForEach<T>(IEnumerable<T> source, Action<T> action) 
    { 
     if(source == null) return; 

     //create a new stack for parallel task we want to run , stack is very performant to add and read elements in sequence 
     var stacks = new Stack<T>[NumberOfParallelTasks]; 

     //instantiate stacks 
     for(var i = 0;i < NumberOfParallelTasks;i++) 
     { 
      stacks[i] = new Stack<T>(); 
     } 

     var itemCount = 0; 

     //spread items in source to all stacks while alternating between stacks 
     foreach(var item in source) 
     { 
      stacks[itemCount++ % NumberOfParallelTasks].Push(item); 
     } 

     if(itemCount==0)return; 

     //if we have less items than number of Parallel tasks we should only spun threads for active stacks 
     var activeStackCount = itemCount < NumberOfParallelTasks ? itemCount : NumberOfParallelTasks; 

     //events are used to notify thread pool completed 
     var events = new ManualResetEvent[activeStackCount]; 

     for(var index = 0;index < activeStackCount;index++) 
     { 
      //assign index to a scope variable otherwise in multithreading index will not be consistant 
      var listIndex = index; 

      events[listIndex] = new ManualResetEvent(false); 

      ThreadPool.QueueUserWorkItem(delegate 
      { 
       //name the thread for debugging 
       if(String.IsNullOrEmpty(Thread.CurrentThread.Name)) 
       { 
        Thread.CurrentThread.Name = String.Format("Parallel.ForEach Worker Thread No:{0}", listIndex); 
       } 

       try 
       { 
        //iterate through our stack 
        var stack = stacks[listIndex]; 
        foreach(var item in stack) 
        { 
         action(item); 
        } 
       } 
       finally 
       { 
        //fire the event to signal WaitHandle that our thread is completed 
        events[listIndex].Set(); 
       } 

      }); 
     } 

     WaitAll(events); 

    } 

    private static void WaitAll(WaitHandle[] waitHandles) 
    { 
     if(Thread.CurrentThread.GetApartmentState() == ApartmentState.STA) 
     { 
      for(var index = 0;index < waitHandles.Length;index++) WaitHandle.WaitAny(waitHandles); 
     } 
     else 
     { 
      WaitHandle.WaitAll(waitHandles); 
     } 
    } 

    /// <summary> 
    /// Performs the specified action on each element of the sequence in seperate threads. 
    /// </summary> 
    /// <typeparam name="T">The type of the elements of source.</typeparam> 
    /// <param name="source">A sequence that contains elements to perform action</param> 
    /// <param name="action">The Action delegate to perform on each element of the IEnumerable.</param> 
    public static void ParallelForEach<T>(this IEnumerable<T> source, Action<T> action) 
    { 
     ForEach(source, action); 
    } 

} 
+0

不应该只有一个并发感知堆栈任务和一组线程。当每个线程启动时,它会检查堆栈的工作情况。如果堆栈为空,则退出。通过这种方式,有些工作很长,有些很短,最终不会有一个线程运行10个快速作业并退出,其他运行10个慢速作业本身。 – jmucchiello 2009-09-11 19:57:16

2

您可能会发现this article是一个有趣的阅读。它讨论了一个平行的foreach是如何工作的,既可以自己做,也可以使用.NET 3.5的Parallel extensions CTP。随着CTP,你可以做到这一点(从文章上面采取为例):

using System.Threading; 

// A simple string collection 
string[] numbers = { "One", "Two", "Three", "Four", "Five", "Six", "Seven", 
    "Eight", "Nine", "Ten", "Eleven", "Twelve", "Thirteen", "Fourteen", "Fifteen"}; 

// equivalent to: foreach (string n in numbers) 
Parallel.ForEach<string>(numbers, delegate(string n) 
{ 
    Console.WriteLine("n={0}", n.ToString()); 
}); 

你应该毫不犹豫地在生产代码使用CTP,除非它只是你自己的项目(在这种情况下,你可能应该要尝试CTP)。