2010-06-13 57 views
5

我意识到,当我尝试使用多个线程处理并发队列中的项目时,多个线程可以将项目放入其中,理想的解决方案是使用Reactive Extensions与并发数据结构。如何在ConcurrentQueue或ConcurrentStack中使用IObservable/IObserver

我原来的问题是:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

所以我很好奇,如果有什么办法可以有作为的项目投入它会连续出列一个LINQ(PLINQ或)查询。

我试图让这个工作的方式,我可以有n个生产者推入队列和有限数量的线程来处理,所以我不会超载数据库。

如果我可以使用Rx框架,那么我希望我可以开始它,并且如果100个项目被放置在100ms内,那么作为PLINQ查询的一部分的20个线程将仅通过队列进行处理。

有三种技术,我想一起工作:

  1. 的Rx框架(无LINQ)
  2. PLING
  3. System.Collections.Concurrent 结构
+0

您能详细说明您期望Rx如何帮助您吗? – 2010-11-10 08:28:25

+0

@Richard Szalay - 正如我在接近尾声时所提到的,我的想法是,我不必轮询查看是否有任何东西在队列中,我可以在有东西放在那里时做出反应,所以如果有大量项目突然推入,我可能有几个线程正在处理。我试图避免投票,这就是我现在正在做的事情。 – 2010-11-10 13:22:01

回答

3

我不不知道如何用Rx完成这项工作,但我建议只使用BlockingCollection<T>producer-consumer pattern。您的主线程将项目添加到集合中,默认情况下,该集合使用下面的ConcurrentQueue<T>。然后,您有一个单独的Task,您在使用Parallel::ForEach而不是BlockingCollection<T>之前旋转起来,以便从系列中同时处理多个项目,以便同时处理系统。现在,您可能还需要考虑使用ParallelExtensions库的GetConsumingPartitioner方法,以便提高效率,因为在这种情况下,默认分区程序会创建比您想要的更多的开销。你可以从this blog post了解更多。

当主线程完成你的BlockingCollection<T>Task::WaitCompleteAddingTask你纺最多等待所有消费者处理完集合中的所有项目。

+0

使用'BlockingCollection'的主要方法是消耗线程块。一个可观察的模式只会在需要处理的时候占用线程。 – 2014-03-21 18:49:32

6

Drew是对的,我认为ConcurrentQueue即使听起来很完美,但实际上是BlockingCollection使用的底层数据结构。对我来说,似乎也非常重要。 查看本书的第7章* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 它将解释如何使用BlockingCollection,并让多个生产者和多个消费者各自脱离“队列”。你会想看看“GetConsumingEnumerable()”方法,可能只是调用.ToObservable()。

*本书的其余部分相当平均。

编辑:

这里是一个示例程序,我认为做你想要的?

class Program 
{ 
    private static ManualResetEvent _mre = new ManualResetEvent(false); 
    static void Main(string[] args) 
    { 
     var theQueue = new BlockingCollection<string>(); 
     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000)); 


     LoadQueue(theQueue, "Producer A"); 
     LoadQueue(theQueue, "Producer B"); 
     LoadQueue(theQueue, "Producer C"); 

     _mre.Set(); 

     Console.WriteLine("Processing now...."); 

     Console.ReadLine(); 
    } 

    private static void ProcessNewValue(string value, string consumerName, int delay) 
    { 
     Thread.SpinWait(delay); 
     Console.WriteLine("{1} consuming {0}", value, consumerName); 
    } 

    private static void LoadQueue(BlockingCollection<string> target, string prefix) 
    { 
     var thread = new Thread(() => 
            { 
             _mre.WaitOne(); 
             for (int i = 0; i < 100; i++) 
             { 
              target.Add(string.Format("{0} {1}", prefix, i)); 
             } 
            }); 
     thread.Start(); 
    } 
} 
+0

这实际上是......巧妙的人......连接Rx和BlockingCollection。哇...你甚至可以用这个东西做一个管道:https://msdn.microsoft.com/en-us/library/ff963548.aspx – Oooogi 2017-02-07 09:29:41