我试图使用BlockingCollection<T>
实现生产者/消费者模式,所以我写了一个简单的控制台应用程序来测试它。如何正确使用BlockingCollection.GetConsumingEnumerable?
public class Program
{
public static void Main(string[] args)
{
var workQueue = new WorkQueue();
workQueue.StartProducingItems();
workQueue.StartProcessingItems();
while (true)
{
}
}
}
public class WorkQueue
{
private BlockingCollection<int> _queue;
private static Random _random = new Random();
public WorkQueue()
{
_queue = new BlockingCollection<int>();
// Prefill some items.
for (int i = 0; i < 100; i++)
{
//_queue.Add(_random.Next());
}
}
public void StartProducingItems()
{
Task.Run(() =>
{
_queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else.
});
}
public void StartProcessingItems()
{
Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Worker 1: " + item);
}
});
Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Worker 2: " + item);
}
});
}
}
但是有3个问题,我的设计:
我不知道堵/守候在我的主要方法的正确途径。做一个简单的空while循环看起来非常低效,CPU使用浪费只是为了确保应用程序不会结束。
我的设计还存在另一个问题,在这个简单的应用程序中,我有一个生产者无限期地生产物品,并且不应该停下来。在真实世界的设置中,我希望它最终结束(例如,用完文件处理)。在那种情况下,我应该如何等待它在Main方法中完成?制作
StartProducingItems
async
然后await
呢?GetConsumingEnumerable
或Add
未按预期工作。生产者应该不断添加物品,但是它会添加一个物品,然后再也不会添加物品。这一个项目然后由其中一个消费者处理。两个消费者然后阻止等待物品被添加,但没有一个是。我知道Take
方法,但在while循环中再次旋转Take
看起来非常浪费和低效。有一个CompleteAdding
方法,但它不允许添加任何其他内容,如果尝试,则会引发异常,因此不适用。
我肯定知道这两个消费者其实阻塞和等待新的项目,我可以在调试过程中的线程之间进行切换:
编辑:
我我在其中一条评论中提出了修改建议,但Task.WhenAll
仍然立即返回。
public Task StartProcessingItems()
{
var consumers = new List<Task>();
for (int i = 0; i < 2; i++)
{
consumers.Add(Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine($"Worker {i}: " + item);
}
}));
}
return Task.WhenAll(consumers.ToList());
}
'生产者应该不断添加物品,但它会添加一个物品,然后再也不会添加。这一个项目然后由其中一个消费者处理。这两个消费者然后阻止等待物品被添加,但没有一个是'呃,是的。你还期待什么?消费者被阻止直到物品被添加,这就是为什么它被称为阻塞收集 –
不,问题是生产者增加一个物品然后停止。 – user9993
这就是你编码的方式。您正在开始添加单个项目的任务。你错过了一个循环或什么? –