0

我试图使用BlockingCollection<T>实现生产者/消费者模式,所以我写了一个简单的控制台应用程序来测试它。如何正确使用BlockingCollection.GetConsumingEnumerable?

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     var workQueue = new WorkQueue(); 
     workQueue.StartProducingItems(); 
     workQueue.StartProcessingItems(); 

     while (true) 
     { 

     } 
    } 
} 

public class WorkQueue 
{ 
    private BlockingCollection<int> _queue; 
    private static Random _random = new Random(); 

    public WorkQueue() 
    { 
     _queue = new BlockingCollection<int>(); 

     // Prefill some items. 
     for (int i = 0; i < 100; i++) 
     { 
      //_queue.Add(_random.Next()); 
     } 
    } 

    public void StartProducingItems() 
    { 
     Task.Run(() => 
     { 
      _queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else. 
     }); 
    } 

    public void StartProcessingItems() 
    { 
     Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Worker 1: " + item); 
      } 
     }); 

     Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Worker 2: " + item); 
      } 
     }); 
    } 
} 

但是有3个问题,我的设计:

  • 我不知道堵/守候在我的主要方法的正确途径。做一个简单的空while循环看起来非常低效,CPU使用浪费只是为了确保应用程序不会结束。

  • 我的设计还存在另一个问题,在这个简单的应用程序中,我有一个生产者无限期地生产物品,并且不应该停下来。在真实世界的设置中,我希望它最终结束(例如,用完文件处理)。在那种情况下,我应该如何等待它在Main方法中完成?制作StartProducingItemsasync然后await呢?

  • GetConsumingEnumerableAdd未按预期工作。生产者应该不断添加物品,但是它会添加一个物品,然后再也不会添加物品。这一个项目然后由其中一个消费者处理。两个消费者然后阻止等待物品被添加,但没有一个是。我知道Take方法,但在while循环中再次旋转Take看起来非常浪费和低效。有一个CompleteAdding方法,但它不允许添加任何其他内容,如果尝试,则会引发异常,因此不适用。

我肯定知道这两个消费者其实阻塞和等待新的项目,我可以在调试过程中的线程之间进行切换:

enter image description here

编辑:

我我在其中一条评论中提出了修改建议,但Task.WhenAll仍然立即返回。

public Task StartProcessingItems() 
{ 
    var consumers = new List<Task>(); 

    for (int i = 0; i < 2; i++) 
    { 
     consumers.Add(Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine($"Worker {i}: " + item); 
      } 
     })); 
    } 

    return Task.WhenAll(consumers.ToList()); 
} 
+0

'生产者应该不断添加物品,但它会添加一个物品,然后再也不会添加。这一个项目然后由其中一个消费者处理。这两个消费者然后阻止等待物品被添加,但没有一个是'呃,是的。你还期待什么?消费者被阻止直到物品被添加,这就是为什么它被称为阻塞收集 –

+0

不,问题是生产者增加一个物品然后停止。 – user9993

+0

这就是你编码的方式。您正在开始添加单个项目的任务。你错过了一个循环或什么? –

回答

4

GetConsumingEnumerable()正在阻塞。如果你想添加到队列不断,你应该把调用_queue.Add在一个循环:

public void StartProducingItems() 
{ 
    Task.Run(() => 
    { 
     while (true) 
      _queue.Add(_random.Next()); 
    }); 
} 

关于Main()方法,你可以调用Console.ReadLine()方法来防止主线程完成您按下前关键:

public static void Main(string[] args) 
{ 
    var workQueue = new WorkQueue(); 
    workQueue.StartProducingItems(); 
    workQueue.StartProcessingItems(); 

    Console.WriteLine("Press a key to terminate the application..."); 
    Console.ReadLine(); 
} 
+0

只要我按下回车键,它就会结束。一定有更好的方法。也许制作的StartProducingItems应该是异步的? – user9993

+0

@ user9993公开你在'StartProcessingItems'中创建的任务,然后在Main()中调用'.Wait()'。如果您有多个工作人员,则使用Task.WhenAll创建一个聚合任务,然后将其公开出来 –

+0

如果您从WorkQueue类中公开任务,您可以等待这些任务。但你如何告诉生产者停止生产? – mm8