我试图围绕BlockingCollection和我的生产者/消费者问题围绕我的头。生产者/消费者,BlockingCollection,并正在等待更改
我想达到的目标,如下:
- 各种各样的线程安全队列在FIFO方式持有对象的列表(“工作”)。
- 第二个线程安全队列,它也以FIFO方式保存这些作业的结果列表。
换句话说:
Inbound "Job" Data, can come at any time from multiple threads
==> Thread-Safe FIFO Queue 1 "FQ1"
==> Async Processing of data in FQ1 (and remove item from FQ1)
==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
==> Async Processing of data in FQ2 (and remove item from FQ2)
==> Done
我卑微的尝试至今都:
private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;
(...)
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}
一个我选择BlockingCollection是因为我想保持负荷降到最低的原因,意义只有当物品实际处于集合内部时才工作(而不处理等待/睡眠)。我不确定foreach是否正确。
请让我知道这是否正确,或者如果有更好的方法。谢谢!
编辑 我可以从单元测试中知道任务内部的工作实际上是同步的。新版本如下:
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
Task.Factory.StartNew(async() => { fq2.Add(await a.DoWork()); });
}
输入非常感谢!
如果.net4.5是一个选项,你无法比看TPL DataFlow做得更好。从BufferBlock(一个线程安全的异步队列)开始并向外工作。 http://www.microsoft.com/en-us/download/details.aspx?id=14782 – spender