2012-07-15 68 views
5
  • 我有一个I/O密集型操作。
  • 我只想要一次运行5个线程的MAX。
  • 我有8000个任务要排队和完成。
  • 每个任务大约需要15-20秒才能执行。

我在线程池四周看了看,但是ThreadPool挫败 - 线程创建超过SetMaxThreads

 ThreadPool.SetMaxThreads(5, 0); 

     List<task> tasks = GetTasks(); 

     int toProcess = tasks.Count; 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 

     for (int i = 0; i < tasks.Count; i++) 
     { 
      ReportGenerator worker = new ReportGenerator(tasks[i].Code, id); 
      ThreadPool.QueueUserWorkItem(x => 
      { 
       worker.Go(); 
       if (Interlocked.Decrement(ref toProcess) == 0) 
        resetEvent.Set(); 
      }); 
     } 

     resetEvent.WaitOne(); 

我想不通为什么......我的代码是在同一时间执行超过5个线程。我试过setmaxthreads,setminthreads,但它一直执行超过5个线程。

发生了什么事?我错过了什么?我应该以另一种方式来做这件事吗?

谢谢

+0

您已经在调试器中验证了** tasks.Count **的值吗?你有没有尝试过把“5”放进去? – 2012-07-15 02:47:03

+0

任务数组中有〜8000个对象 – Mike 2012-07-15 02:48:00

回答

3

任务并行库可以帮助您:

List<task> tasks = GetTasks(); 

Parallel.ForEach(tasks, new ParallelOptions { MaxDegreeOfParallelism = 5 }, 
    task => {ReportGenerator worker = new ReportGenerator(task.Code, id); 
      worker.Go();}); 

What does MaxDegreeOfParallelism do?

+0

这很简单!像魅力一样工作!谢谢! – Mike 2012-07-15 03:37:07

1

我认为有一个不同的,更好的方法来解决这个问题。 (请原谅,如果我不小心Java化了一些语法)

这里的主线程有一个在“任务”中要做的事情列表 - 而不是为每个任务创建线程,当你有这么多的项目,创建所需的线程数量,然后让他们根据需要从列表中请求任务。

第一件要做的事情就是在这个代码来自的类中添加一个变量,用作指向列表的指针。我们还将添加一个用于最大期望线程数。

// New variable in your class definition 
private int taskStackPointer; 
private final static int MAX_THREADS = 5; 

创建一个方法,该方法返回列表中的下一个任务并递增堆栈指针。然后创建了这个新的接口:

// Make sure that only one thread has access at a time 
[MethodImpl(MethodImplOptions.Synchronized)] 
public task getNextTask() 
{ 
    if(taskStackPointer < tasks.Count) 
     return tasks[taskStackPointer++]; 
    else 
     return null; 
} 

或者,你可以返回任务[taskStackPointer ++]代码,如果有,你可以指定为表示“列表结束”的值。然而,这样做可能更容易。

接口:

public interface TaskDispatcher 
{ 
    [MethodImpl(MethodImplOptions.Synchronized)] public task getNextTask(); 
} 

内ReportGenerator类,改变构造函数接受调度对象:

public ReportGenerator(TaskDispatcher td, int idCode) 
{ 
    ... 
} 

您还需要改变ReportGenerator类以便处理有一个外部循环,它通过调用td.getNextTask()来请求一个新任务,并在返回NULL时退出循环。

最后,改变线程创建代码是这样的:(这只是给你一个想法)

taskStackPointer = 0; 
for (int i = 0; i < MAX_THREADS; i++) 
{ 
    ReportGenerator worker = new ReportGenerator(this,id); 
    worker.Go(); 
} 

创建线程的所需数量的方式,让他们都在最大容量工作。

(我不确定我是否使用了“[MethodImpl(MethodImplOptions.Synchronized)]”正好...我更习惯的Java比C#)

+0

感谢您花时间回答我的问题,它确实有道理。尽管如此,这种方法更加冗长:P – Mike 2012-07-15 03:36:45

+0

它可能稍微冗长一些,但是一旦找到它,它就会非常高效且易于理解。 – 2012-07-15 03:49:21

1

你的任务列表将在它8K项,因为你说的代码放在他们那里:

List<task> tasks = GetTasks(); 

这就是说,这个数字已经没有任何关系多少线程在这个意义上被使用,调试器总是要告诉你的广告有多少项目代表名单。

有多种方法可以确定正在使用多少个线程。也许最简单的一个就是用调试器打入应用程序并查看线程窗口。不仅你会得到一个计数,但你会看到每个线程正在做什么(或不是),这导致我...

有关于你的任务正在做什么以及你如何到达的重要讨论在一个数字'节流'的线程池。在大多数情况下,线程池将会做正确的事情。

现在来回答您的具体问题......

要明确控制并发任务的数量,考虑一个简单的实现,将涉及从清单BlockingCollection改变你的任务集合(也将在内部使用ConcurrentQueue)和下面的代码为“消费”的工作:

var parallelOptions = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = 5 
}; 

Parallel.ForEach(collection.GetConsumingEnumerable(), options, x => 
{ 
    // Do work here... 
}); 

变化MaxDegreeOfParallelism,无论你已经确定并发值是适合你正在做的工作。

以下可能是你的兴趣:

Parallel.ForEach Method

BlockingCollection

克里斯

3

有在那个SetMaxThreads的限制,你不能把它比数较低处理器在系统上。如果您有8个处理器,将其设置为5与完全不调用该函数相同。