2009-04-28 161 views
5

我有什么,我认为是一个很常见的线程的情景:线程队列傻瓜

  • 我有100页相同的作业完成
  • 所有作业是相互独立的各个 其他
  • 我要处理同时
  • 由于每个作业 完成最多的 15的工作,新的工作将开始 ,直到所有的工作已经完成

如果您假设每个作业在他完成时(我使用BackgroundWorker类)都会触发一个事件,我可以想到几种方法来解决这个问题,但我不确定“正确“的解决方案是。我希望你们中的一些专家能指出我的方向。

解决方案1:有一个while(continue){Threading.Sleep(1000);在我的Main()函数中循环}。当Job_Completed事件处理程序中的代码将设置continue = false时A)没有作业剩余排队和B)所有排队的作业已完成。我之前和之后都使用过这个解决方案,看起来工作正常......对我来说似乎有点“奇怪”。

解决方案2:在我的Main()函数中使用Application.Run()。同样,Job_Completed事件处理程序中的代码会在A)没有作业保留排队时,调用Application.Exit(),并且B)所有排队的作业都已完成。解决方案3:使用ThreadPool,排队所有500-1000个请求,让他们一次运行10个(SetMaxThreads),并以某种方式等待它们全部完成。

在所有这些解决方案中,基本思想是每次完成另一项工作时都会开始一项新工作,直到没有剩余工作。所以,问题不仅在于等待现有的工作完成,而且还在等待直到不再有待处理的工作开始。如果ThreadPool是正确的解决方案,那么等待ThreadPool完成所有排队项目的正确方法是什么?

我想我最重要的困惑在于,我不明白HOW事件是否能够从我的Main()函数中触发。显然他们这样做,我只是不明白从Windows消息循环的角度来看它的机制。解决这个问题的正确方法是什么?为什么?

+0

看起来大多数建议都围绕着一个ThreadPool风格的解决方案......我上面提出的解决方案1和解决方案2(基本上等待被正在触发的事件修改的条件)呢?这样做是否有内在的错误,还是仅仅是因为.NET提供了ThreadPool,所以它没有必要? 看起来很奇怪的代码是这样的: while(continue) Threading.Sleep(1000); ...等待事件在Main()函数中触发。在像这样的代码中......当我的事件得到处理时......在Sleep()调用中的某个地方? – 2009-04-28 15:10:55

回答

0

这里是我将如何接近它的伪代码(这不利用线程池,所以有人可能有一个更好的答案:)

main 
{ 
    create queue of 100 jobs 
    create new array of 15 threads 
    start threads, passing each the job queue 
    do whatever until threads are done 
} 

thread(queue) 
{ 
    while(queue isn't empty) 
    { 
     lock(queue) { if queue still isn't empty dequeue a thing } 
     process the thing 
    } 

    queue is empty so exit thread 
} 

编辑:如果你的问题是如何知道何时线程完成,并且您正在使用正常的C#线程(不是ThreadPooled线程),您可以使用可选超时在每个线程上调用Thread.Join(),并且只有在线程完成后才会返回。如果你想保持跟踪有多少线程而不会挂了一个完成,可以循环通过他们在这样的方式:

for(int i = 0; allThreads.Count > 0; i++) 
{ 
    var thisThread = allThreads[i % threads.Count]; 
    if(thisThread.Join(timeout)) // something low, maybe 100 ms or something 
     allThreads.Remove(thisThread); 
} 
2

回复:“莫名其妙地等待他们全部完成”

ManualResetEvent是你的朋友,在你开始你的大批量创建这些小狗之一之前,在你的主线程中等待它,在作业完成后将它设置在后台操作的末尾。

另一种方法是手动创建线程,做一个foreach线程的Thread.join()

你可以使用这个(我在测试过程中使用)

 private void Repeat(int times, int asyncThreads, Action action, Action done) { 
     if (asyncThreads > 0) { 

      var threads = new List<Thread>(); 

      for (int i = 0; i < asyncThreads; i++) { 

       int iterations = times/asyncThreads; 
       if (i == 0) { 
        iterations += times % asyncThreads;      
       } 

       Thread thread = new Thread(new ThreadStart(() => Repeat(iterations, 0, action, null))); 
       thread.Start(); 
       threads.Add(thread); 
      } 

      foreach (var thread in threads) { 
       thread.Join(); 
      } 

     } else { 
      for (int i = 0; i < times; i++) { 
       action(); 
      } 
     } 
     if (done != null) { 
      done(); 
     } 
    } 

用法:

// Do something 100 times in 15 background threads, wait for them all to finish. 
Repeat(100, 15, DoSomething, null) 
1

我只是使用任务并行库。

您可以将它作为一个单一的,简单的Parallel.For循环与您的任务,它会自动管理这相当干净。如果您不能等待C#4和Microsoft的实现,临时解决方法是编译并使用Mono Implementation of TPL。 (我个人比较喜欢MS的实现,特别是较新的beta版本,但是Mono是当今的功能和可再发行版本。)

0

当您在线程队列中排队工作项时,应该返回一个等待句柄。把它们放在一个数组中,你可以把它作为参数传递给WaitAll()函数。

+0

好主意,但你会如何做到这一点? QueueUserWorkItem返回一个布尔值。 – Grokys 2009-04-28 01:14:13

1

我会使用ThreadPool。

在开始运行作业之前,请创建一个ManualResetEvent和一个int计数器。将每个作业添加到ThreadPool,每次递增计数器。

在每项工作结束时,递减计数器,当计数器达到零时,在事件上调用Set()

在您的主线程中,请致电WaitOne()等待所有作业完成。

3

即使其他答案很好,如果你想要另一个选项(你永远不能有足够的选择),那么这是一个想法。

只需将每个作业的数据放入FIFO堆栈中的结构即可。

创建15个主题。

每个线程都会从堆栈中获取下一个作业,并将其弹出。

当一个线程完成处理时,获取下一个工作,如果堆栈为空,则线程死亡或者只是睡眠,等待。

唯一的解决办法很简单,就是让弹出窗口处于关键部分(同步读取/弹出窗口)。

0

ThreadPool可能是要走的路。 SetMaxThreads方法将能够限制正在执行的线程数。但是,这会限制进程/ AppDomain的最大线程数。如果进程作为服务运行,我不会建议使用SetMaxThreads

private static ManualResetEvent manual = new ManualResetEvent(false); 
private static int count = 0; 

public void RunJobs(List<JobState> states) 
{ 
    ThreadPool.SetMaxThreads(15, 15); 

    foreach(var state in states) 
    { 
      Interlocked.Increment(count); 
      ThreadPool.QueueUserWorkItem(Job, state); 
    } 

    manual.WaitOne(); 
} 

private static void Job(object state) 
{ 
    // run job 
    Interlocked.Decrement(count); 
    if(Interlocked.Read(count) == 0) manual.Set(); 
} 
0

微软的反应框架是一流此:

Action[] jobs = new Action[100]; 

var subscription = 
    jobs 
     .ToObservable() 
     .Select(job => Observable.Start(job)) 
     .Merge(15) 
     .Subscribe(
      x => Console.WriteLine("Job Done."), 
      () => Console.WriteLine("All Jobs Done.")) 

完成。

Just NuGet“System.Reactive”。