在工作中,我们的一个进程使用SQL数据库表作为队列。我一直在设计一个队列阅读器来检查表中的排队工作,在工作开始时更新行状态,并在工作完成时删除行。我使用Parallel.Foreach
给每个进程自己的线程和设定MaxDegreeOfParallelism
到4Odd behavior with yield and Parallel.ForEach
当队列读取器启动时,它会检查任何未完成的工作,并加载工作纳入一个列表中,那么它就是一个Concat
与名单以及返回在无限循环中运行的IEnumerable
以检查新工作的方法。这个想法是,应该先处理未完成的工作,然后可以在线程可用的情况下工作。然而,我所看到的是FetchQueuedWork
会将队列表中的几十行更改为立即“处理”,但一次只能处理几个项目。
我想要发生的事情是FetchQueuedWork
只会在Parallel.Foreach
中打开一个槽时才会得到新的工作并更新表格。对我来说真的很奇怪,它的行为与我在本地开发人员环境中运行代码时的预期完全相同,但在生产中我遇到了上述问题。
我使用.NET 4以下是代码:
public void Go()
{
List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork();
IEnumerable<WorkData> work = unfinishedWork.Concat(FetchQueuedWork());
Parallel.ForEach(work, new ParallelOptions { MaxDegreeOfParallelism = 4 }, DoWork);
}
private IEnumerable<WorkData> FetchQueuedWork()
{
while (true)
{
var workUnit = WorkData.GetQueuedWorkAndSetStatusToProcessing();
yield return workUnit;
}
}
private void DoWork(WorkData workUnit)
{
if (!workUnit.Loaded)
{
System.Threading.Thread.Sleep(5000);
return;
}
Work();
}
这很有趣。不幸的是,我忘了提及我在.Net 4上,而这个功能只有4.5。 – 2014-09-23 02:43:47