2017-12-27 363 views
1

更新:Java的嵌套队列

总之,我有一个消息发送从用户到服务的队列。 但是,当一个服务获得1000条消息时,比队列由 支配,它和其他服务在队列为空之前不会收到消息。我需要这种方法的解决方案。

我对我的BlockingQueue有问题。比方说,我有100个外部服务,我负责向他们发送消息来自用户。

用户 - >我 - >第三方。

我在我的应用程序中使用队列。当其中一个第三方服务不可用时发生问题:

  • 从用户接收到任何第三方的消息。将它们保存在同一个队列中可以说Q1。
  • 消费队列,识别第三方服务并发送给对应的第三方。
  • 第三方之一不响应,超时。
  • 保存无法将它们传送到数据库的消息。
  • 定期从数据库获取未发送的消息并将它们放入我的Q1。
  • 如果该第三方获得大量流量,可以说5000个消息(填满我的队列)比其他服务的非消息从我那里收到消息,直到此队列为空。但是当这个队列为空时,他们将不会收到消息,因为我将再次从DB获取消息并将它们放入队列中,其他服务将再次被阻塞。

Solutiouns,我认为到目前为止:

  • 使用嵌套队列的业务ID,所以我会在一次5000级的消息找出封锁的服务,并将它们保存到数据库没有5000次消费从队列中的消息(可更因为消息将不会是为了,他们都是一起与其他服务于一体的队列)

如何有效地使用嵌套concurret队列(这是我使用的BlockingQueue),或者你认为此方案更好的解决方案?基本上我不想让第三方下降支配我的队列。我需要一个聪明的方法来分隔这些队列,可能会有成千上万的队列,因此为每个队列创建一个队列可能是无效的。

+0

你有没有考虑过使用像RxJava这样的响应式框架? –

+0

你可以标记出现超时的那些,并且不要将它们放回队列中以增加时间。比如,第一次你只在5秒后才把它们放回去,第二次它们会出来10,然后是20,40,一分钟,两分钟等等。你明白了。为了不让系统空闲,如果队列为空,则可以忽略上述情况。 –

+0

我必须先研究RxJava,谢谢你的建议。我已经有超时映射,它的工作原理就好像X个消息是超时消息,而不是服务Y的新消息写入数据库而不是队列,并且将被阻塞Z次。经过Z次后,如果它仍然被阻塞,则新消息被阻塞,直到Z + 1次等。但是另一方面,这使得该服务在DB中等待处理大量消息。从数据库中检索这些消息以进行处理时,比我的队列再次被阻塞。 – cmlonder

回答

0

看来您正在使用单个BlockingQueue来保留所有第三方服务的消息。

相反,

  • 使用单独阻塞队列和队列消费者对各第三方服务
  • 当您从用户那里得到的,而不是在队列中添加它的消息,确认该第三方服务有本身并相应地将消息添加到第三方服务专用阻止队列中。