1

我试图围绕BlockingCollection和我的生产者/消费者问题围绕我的头。生产者/消费者,BlockingCollection,并正在等待更改

我想达到的目标,如下:

  • 各种各样的线程安全队列在FIFO方式持有对象的列表(“工作”)。
  • 第二个线程安全队列,它也以FIFO方式保存这些作业的结果列表。

换句话说:

Inbound "Job" Data, can come at any time from multiple threads 
    ==> Thread-Safe FIFO Queue 1 "FQ1" 
     ==> Async Processing of data in FQ1 (and remove item from FQ1) 
     ==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2" 
      ==> Async Processing of data in FQ2 (and remove item from FQ2) 
       ==> Done 

我卑微的尝试至今都:

private BlockingCollection<InboundObject> fq1; 
private BlockingCollection<ResultObject> fq2; 

(...) 

Task.Factory.StartNew(() => 
{ 
    foreach (InboundObject a in fq1.GetConsumingEnumerable()) 
     a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject> 
} 

一个我选择BlockingCollection是因为我想保持负荷降到最低的原因,意义只有当物品实际处于集合内部时才工作(而不处理等待/睡眠)。我不确定foreach是否正确。

请让我知道这是否正确,或者如果有更好的方法。谢谢!

编辑 我可以从单元测试中知道任务内部的工作实际上是同步的。新版本如下:

Task.Factory.StartNew(() => 
{ 
    foreach (InboundObject a in fq1.GetConsumingEnumerable()) 
     Task.Factory.StartNew(async() => { fq2.Add(await a.DoWork()); }); 
} 

输入非常感谢!

+2

如果.net4.5是一个选项,你无法比看TPL DataFlow做得更好。从BufferBlock(一个线程安全的异步队列)开始并向外工作。 http://www.microsoft.com/en-us/download/details.aspx?id=14782 – spender

回答

1

我选择BlockingCollection的原因之一是因为我希望将负载保持在最低限度,这意味着只有在物品实际位于集合内时才能工作(而不处理等待/睡眠)。我不确定foreach是否正确。

这是正确的方法,foreach将被阻塞,直到新的项目将被添加到队列或CompleteAdding方法将被调用。不正确的是你想用BlockingCollection实现异步处理。 BlockingCollection是一个简单的生产者/消费者队列,必须在需要维护作业和作业结果处理的顺序时使用。因为它是同步的。工作将按照添加的顺序进行处理。

如果您只需要异步执行,则不需要队列。在这种情况下,您可以使用TPL,为每个作业创建一个新任务,他们将在内部由TPL排队,并将使用尽可能多的OS线程,因为系统可以高效地处理这些线程。例如,你的工作可以产生他们自己的任务。这是更灵活的方法。

另外,可以使用生产者/消费者队列来组织管道作业的执行。在这种情况下,工作必须分成几个步骤。每个步骤都必须由专用线程执行。在每个作业步骤线程中,我们必须从一个队列中读取作业,执行此作业,并将其排入下一个队列。

interface IJob 
{ 
    void Step1(); 
    void Step2(); 
    ... 
} 

var step1 = new BlockingCollection<IJob>(); 
var step2 = new BlockingCollection<IJob>(); 
... 

Task.Factory.StartNew(() => 
    { 
     foreach(var step in step1.GetConsumingEnumerable()) { 
      step.Step1(); 
      step2.Add(step); 
     } 
    }); 

Task.Factory.StartNew(() => 
    { 
     foreach(var step in step2.GetConsumingEnumerable()) { 
      // while performing Step2, another thread can execute Step1 
      // of the next job 
      step.Step2(); 
      step3.Add(step); 
     } 
    }); 

在这种情况下,作业将按先进先出顺序执行,但并行执行。 但是如果你想做流水线处理,你首先要考虑负载均衡。如果其中一个步骤花费太多时间,则队列将变大,而其他线程大部分时间都将空闲。