2

我有以下情况:生产者 - 消费者:怎么知道通知prodcution完成

  1. 阅读从数据库
  2. 数据做工作“计算”
  3. 结果写入数据库

我有一个线程从数据库读取并将生成的对象放入BlockingQueue。这些对象的体重非常大,因此队列限制了内存中对象的数量。 多个线程从队列中取对象,执行工作并将结果放入第二个队列中。 最终线程从第二个队列获取结果并将结果保存到数据库。

问题是如何防止死锁,例如。 “计算线程”需要知道何时不会有更多的对象被放入队列。 目前我通过传递线程(可调用)的引用来实现此目的,并在poll或offer之前检查thread.isDone(),然后如果元素为null。我还检查队列的大小,只要有元素,就必须消耗。使用take或put会导致死锁。

有没有更简单的方法来实现这一目标?

+0

可能是一个愚蠢的http://stackoverflow.com/questions/5326013/proper-implementation-of-producer-consumer-scenario-and-graceful-termination-of – 2011-09-06 05:19:45

回答

0

当你确定没有更多的任务要到队列中时,其中一种方法是将“虚拟”或“中毒”消息作为队列中的最后一条消息。例如,在把与db查询的最后一行有关的消息。因此,生产者在队列中放置了一个虚拟消息,消费者在接收到这个虚拟消息时就知道在这批中没有更多有意义的工作。

+0

我更像是一个生产者 - 消费者/生产者 - 消费者模式。中间部分消耗数据,然后将结果转发给其他消费者。问题是我有这个“中间部分”的多个实例。这是艰苦工作完成的地方。问题是“虚拟数据”可以比以前的“真实数据”更快地通过这条链,因为虚拟物上没有工作。 –

+0

虚拟数据或posion消息只是为了避免死锁情况;所以你知道没有更多的消息会到达。你也可以使用倒计时锁存器或其他方法来跟踪已经启动的任务的完成情况。 – Scorpion

0

也许你应该看看CompletionService

它被设计成执行相结合,在一个队列功能。 任务即完成执行将可以从完井服务通过

completionServiceInstance.take() 

然后,您可以再次使用其他执行人3.填写即与DB的结果,你将与来自completionServiceInstance采取的结果馈送。

+0

这不会帮助我,因为我没有传递大量任务,而是从数据库生成对象。我只有3个任务必须并行运行1.从数据库中读取2.执行工作3.将结果写入数据库。它们必须并行才能限制内存使用量,因为读写操作比实际工作快得多。 –