Drew是对的,我认为ConcurrentQueue即使听起来很完美,但实际上是BlockingCollection使用的底层数据结构。对我来说,似乎也非常重要。 查看本书的第7章* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 它将解释如何使用BlockingCollection,并让多个生产者和多个消费者各自脱离“队列”。你会想看看“GetConsumingEnumerable()”方法,可能只是调用.ToObservable()。
*本书的其余部分相当平均。
编辑:
这里是一个示例程序,我认为做你想要的?
class Program
{
private static ManualResetEvent _mre = new ManualResetEvent(false);
static void Main(string[] args)
{
var theQueue = new BlockingCollection<string>();
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));
LoadQueue(theQueue, "Producer A");
LoadQueue(theQueue, "Producer B");
LoadQueue(theQueue, "Producer C");
_mre.Set();
Console.WriteLine("Processing now....");
Console.ReadLine();
}
private static void ProcessNewValue(string value, string consumerName, int delay)
{
Thread.SpinWait(delay);
Console.WriteLine("{1} consuming {0}", value, consumerName);
}
private static void LoadQueue(BlockingCollection<string> target, string prefix)
{
var thread = new Thread(() =>
{
_mre.WaitOne();
for (int i = 0; i < 100; i++)
{
target.Add(string.Format("{0} {1}", prefix, i));
}
});
thread.Start();
}
}
您能详细说明您期望Rx如何帮助您吗? – 2010-11-10 08:28:25
@Richard Szalay - 正如我在接近尾声时所提到的,我的想法是,我不必轮询查看是否有任何东西在队列中,我可以在有东西放在那里时做出反应,所以如果有大量项目突然推入,我可能有几个线程正在处理。我试图避免投票,这就是我现在正在做的事情。 – 2010-11-10 13:22:01