2015-02-09 103 views
3

为线程完成一个队列线路的最佳方式是什么,以便我只能拥有最大数量的线程,并且如果我已经有很多代码在等待空闲槽之前继续..在继续执行代码之前等待空闲的线程槽号

我的意思伪codeish例子,我确信这可以以更好的方式来完成...

(请检查下面的附加要求)

private int _MaxThreads = 10; 
private int _CurrentThreads = 0; 

public void main(string[] args) 
{ 
    List<object> listWithLotsOfItems = FillWithManyThings(); 

    while(listWithLotsOfItems.Count> 0) 
    { 
     // get next item that needs to be worked on 
     var item = listWithLotsOfItems[0]; 
     listWithLotsOfItems.RemoveAt(0); 

     // IMPORTANT!, more items can be added as we go. 
     listWithLotsOfItems.AddRange(AddMoreItemsToBeProcessed()); 

     // wait for free thread slot 
     while (_CurrentThreads >= _MaxThreads) 
      Thread.Sleep(100); 

     Interlocked.Increment(ref _CurrentThreads); // risk of letting more than one thread through here... 
     Thread t = new Thread(new ParameterizedThreadStart(WorkerThread(item)); 
     t.Start(); 
    } 
} 

public void WorkerThread(object bigheavyObject) 
{ 
    // do heavy work here 
    Interlocked.Decrement(ref _CurrentThreads); 
} 

看着Sempahore但似乎需要在t内运行他在线程创建之前并不在外线。在这个例子中,Semaphore在线程内部被创建后用于暂停它,在我的情况下,可能有超过10万个线程需要在作业完成之前运行,所以我宁愿在插槽可用之前创建线程。 (link to semaphore example

在实际应用中,数据可以被添加到项目列表的程序进行所以Parallel.ForEach将没有真正的工作是(我在SSIS这样的脚本组件包发送数据到一个非常慢的WCF)。

SSIS具有.NET 4.0

+0

您是否尝试过使用Semaphore和SemaphoreSlim? https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim%28v=vs.110%29.aspx – 2015-02-09 15:40:56

+0

@ Benji_9989是的,请看我添加的评论。 – JensB 2015-02-09 15:42:56

+0

倾向于使用类似['Parallel.ForEach']的构造(https://msdn.microsoft.com/en-us/library/system.threading.tasks.parallel.foreach%28v=vs.110%29.aspx )使用MaxDegreeOfParallelism选项,而不是手动创建和管理线程(如果可能的话)。 – 2015-02-09 15:46:07

回答

0

您是否使用等待句柄考虑?请参阅this

此外,您还可以使用Parallel.Foreach为您管理线程创建。

希望它帮助;)

2

要做到这一点,最简单的方法是使用的Parallel.ForEach()过载,它允许您选择MaxDegreeOfParallelism

下面是一个示例程序:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    public static class Program 
    { 
     private static void Main() 
     { 
      List<int> items = Enumerable.Range(1, 100).ToList(); 

      Parallel.ForEach(items, new ParallelOptions {MaxDegreeOfParallelism = 5}, process); 
     } 

     private static void process(int item) 
     { 
      Console.WriteLine("Processing " + item); 
      Thread.Sleep(2000); 
     } 
    } 
} 

如果你运行它,你会发现它非常快处理5个元素,然后有被处理的元素的一个数据块之前的延迟(由Thread.Sleep(2000)) 。这是因为在这个示例代码中,不能超过5个线程一次执行。

请注意,如果MaxDegreeOfParallelism超过线程池的最小线程值,则可能需要一段时间才能启动所有线程。

原因是Parallel.ForEach()使用线程池线程 - 并且默认情况下,线程池保持可用的线程数量是一定的。创建超出此限制的线程时,会在每个新线程池线程创建之间引入延迟。

您可以使用ThreadPool.SetMinThreads()将线程池线程的最小数量设置为更高的值,但我不建议这样做。

但是,如果你想这样做,这里的这台最小线程数到20的例子:

ThreadPool.GetMinThreads(out dummy, out ioThreads); 
ThreadPool.SetMinThreads(20, ioThreads); 

如果你这样做,然后用MaxDegreeOfParallelism = 20运行前面的代码,你会看到,初始线程创建时不再有任何延迟。

+0

如何处理'items'列表在执行过程中获取更多项目?我可以在一段时间内将整个Parallel.ForEach封装起来,在最坏的情况下,它会循环并重新开始一个新的...也许这就是最好的方法。 – JensB 2015-02-09 16:12:47

+0

你最后的陈述是不正确的。 'Parallel.Foreach'将愉快地请求比处理器更多的线程。然而,它仍然有一个“技巧”,“Parallel.Foreach'只会在第一次穿过主体时使用非常少的线程,每个循环都会添加更多线程,直到达到最大值。最后一段应该说“注意MaxDegreeOfParallelism设置是MAXIMUM值;'Parallel.Foreach'可能使用的线程少于您在其”预热“阶段开始时在最大值中指定的线程数。 – 2015-02-09 16:13:04

+0

我在循环内放置了一个断点,查看了工作线程的数量。 [第一击](http://i.stack.imgur.com/hknEI.png)主线程+2个工作线程(3个线程正在运行),[第二击](http://i.stack.imgur.com/ VjNg4.png)主线程+ 3个工作线程(4个线程正在运行),[第三次命中](http://i.stack.imgur.com/Hurb5.png)主线程+4个工作线程(5个线程正在运行)。在那之后,它停留在5个线程上。 – 2015-02-09 16:22:47

3

所以,让我首先说,你要做的只是给你一个非常具体的安排性能的小幅提高。尝试调整线程分配级别可能需要很多工作,因此在继续之前请确保您有一个非常好的理由。

现在,首先,如果你想简单地排队工作,你可以把它放在.NET线程池中。它只会将线程分配给最大配置,并且任何不适合这些线程的工作(如果所有线程都处于繁忙状态)将排队等待线程变为可用状态。

要做到这一点,最简单的方法是调用:

Task.Factory.StartNew(() => { /* Your code */}); 

这将创建一个TPL任务,并安排其在默认任务调度,这应该反过来任务分配给线程池中运行。

如果您需要等待这些任务,然后再继续完成,你可以将它们添加到集合,然后使用Task.WaitAll(...)

var tasks = new List<Task>(); 

tasks.Add(Task.Factory.StartNew(() => { /* Your code */})); 

// Before leaving the script. 
Task.WaitAll(tasks); 

但是,如果你需要去深入和控制这些调度任务,您可以查看创建支持有限并发性的自定义任务计划程序。 This MSDN article有关它的更多细节并建议可能的实现,但这不是一项简单的任务。

相关问题