我的应用程序从RabbitMQ消费一些消息并处理它们。 我有大约10个队列,每个队列有多达10个消费者(线程)。 我有一个预取5.我使用CloudAMQP插件(RabbitMQ作为服务)在Heroku中运行我的设置。RabbitMQ java客户端停止消费消息
我正在运行默认的心跳和连接超时设置(60秒)。
我的java应用程序是使用spring-rabbit库的spring引导应用程序。
版本:
RabbitMQ 3.5.3
Erlang 17.5.3
Java 1.8
Spring boot 1.3.2.RELEASE
Spring rabbit 1.5.3.RELEASE
的问题是,对于消费者一个特定的队列停止消费消息的一段时间后。当我重新启动我的Java应用程序时,一切正常。其他队列正在被正常使用。在应用程序方面没有错误。在兔子身边的日志流中,我看到一些条目,如
= REPORT==== 2016-08-02 15:53:32 UTC ===
closing AMQP connection <SOMETHING> (SOMETHING_ELSE -> SOMETHING_ELSE_ELSE):
{heartbeat_timeout,running}
我无法在Heroku中本地或在测试环境中进行复制。
更新
下面的代码可以在AMQConnection.class
int heartbeat = negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());
private static int negotiatedMaxValue(int clientValue, int serverValue) {
return (clientValue == 0 || serverValue == 0) ?
Math.max(clientValue, serverValue) :
Math.min(clientValue, serverValue);
}
发现我不能增加超过60秒的心跳值(这是我从服务器获取)。
你好@rdegges。感谢您的帮助。该计划足够大来处理所有的连接。此外(这是我的不好,我不清楚)消费者最初都在工作,但偶尔这个特定队列的消费者会停止消费。如果我重新启动应用程序,那么一切正常,消费者再次开始工作。我不能重现它。我的应用程序运行在一台大型机器上(PL Web 1)。我熟悉heroku infra的性质(重新启动等),应用程序可以处理这个问题。 – alkis
我在想这可能是由于过载与心跳配置相结合导致的。我正在等待这种情况再次发生,看看这个特定的消费者连接是否被兔子丢弃,但消费者仍然没有意识到(这将解释没有任何错误日志)。如果发生这种情况,那么将心跳更改为更大的值可能会修复它。但我不知道这是否可能。检查我的更新。 – alkis
啊,这使诊断更加棘手。特别是如果它不能很容易地复制:(但是,消费者应该重新启动连接,如果它下降。你的心跳配置看起来很好给我 – rdegges