2014-09-23 50 views
1

在工作中,我们的一个进程使用SQL数据库表作为队列。我一直在设计一个队列阅读器来检查表中的排队工作,在工作开始时更新行状态,并在工作完成时删除行。我使用Parallel.Foreach给每个进程自己的线程和设定MaxDegreeOfParallelism到4Odd behavior with yield and Parallel.ForEach

当队列读取器启动时,它会检查任何未完成的工作,并加载工作纳入一个列表中,那么它就是一个Concat与名单以及返回在无限循环中运行的IEnumerable以检查新工作的方法。这个想法是,应该先处理未完成的工作,然后可以在线程可用的情况下工作。然而,我所看到的是FetchQueuedWork会将队列表中的几十行更改为立即“处理”,但一次只能处理几个项目。

我想要发生的事情是FetchQueuedWork只会在Parallel.Foreach中打开一个槽时才会得到新的工作并更新表格。对我来说真的很奇怪,它的行为与我在本地开发人员环境中运行代码时的预期完全相同,但在生产中我遇到了上述问题。

我使用.NET 4以下是代码:

public void Go() 
{ 
    List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork(); 
    IEnumerable<WorkData> work = unfinishedWork.Concat(FetchQueuedWork());  
    Parallel.ForEach(work, new ParallelOptions { MaxDegreeOfParallelism = 4 }, DoWork); 
} 

private IEnumerable<WorkData> FetchQueuedWork() 
{ 
    while (true) 
    { 
     var workUnit = WorkData.GetQueuedWorkAndSetStatusToProcessing(); 
     yield return workUnit; 
    } 
} 

private void DoWork(WorkData workUnit) 
{ 
    if (!workUnit.Loaded) 
    { 
     System.Threading.Thread.Sleep(5000); 
     return; 
    } 
    Work(); 
} 

回答

3

我怀疑是默认行为是缓冲输入(发行模式?)。手了 - 当谈到.NET 4.5

List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork(); 
IEnumerable<WorkData> work = unfinishedWork.Concat(FetchQueuedWork());  
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 }; 
var partitioner = Partitioner.Create(work, EnumerablePartitionerOptions.NoBuffering); 
Parallel.ForEach(partioner, options, DoWork); 
+0

这很有趣。不幸的是,我忘了提及我在.Net 4上,而这个功能只有4.5。 – 2014-09-23 02:43:47

3

Blorgbeard的解决方案是正确的:您可能需要创建自己的分区,并通过它的NoBuffering选项。

如果你被限制到.NET 4,你有几种选择:

  • 替换您Parallel.ForEachwork.AsParallel().WithDegreeOfParallelism(4).ForAll(DoWork)。 PLINQ在缓冲物品方面更加保守,所以这应该是个诀窍。

  • 写你自己的枚举分区器(祝你好运)。

  • 创建一个难看的信号量劈像这样:

副作用的用于简洁起见Select

public void Go() 
{ 
    const int MAX_DEGREE_PARALLELISM = 4; 

    using (var semaphore = new SemaphoreSlim(MAX_DEGREE_PARALLELISM, MAX_DEGREE_PARALLELISM)) 
    { 
     List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork(); 

     IEnumerable<WorkData> work = unfinishedWork 
      .Concat(FetchQueuedWork()) 
      .Select(w => 
      { 
       // Side-effect: bad practice, but easier 
       // than writing your own IEnumerable. 
       semaphore.Wait(); 

       return w; 
      }); 

     // You still need to specify MaxDegreeOfParallelism 
     // here so as not to saturate your thread pool when 
     // Parallel.ForEach's load balancer kicks in. 
     Parallel.ForEach(work, new ParallelOptions { MaxDegreeOfParallelism = MAX_DEGREE_PARALLELISM }, workUnit => 
     { 
      try 
      { 
       this.DoWork(workUnit); 
      } 
      finally 
      { 
       semaphore.Release(); 
      } 
     }); 
    } 
}