2016-01-14 89 views

回答

30

此例外表明您正在以比它们可以发送更快的速度对记录进行排队。

当您调用send方法时,ProducerRecord将存储在内部缓冲区中以发送给代理。一旦ProducerRecord已被缓冲,该方法立即返回,不管它是否已被发送。

记录被分组为批次用于发送给代理,以减少每个消息被偷听的传输并提高吞吐量。

一旦一个记录添加一个批次,发送该批次的时间限制就会确保它在指定的时间内发送。这由Producer配置参数request.timeout.ms控制,默认值为30秒。

如果批处理已经排队超过超时限制,则会抛出异常。该批次中的记录将从发送队列中删除。

使用配置参数增加超时限制将允许客户端在到期之前将批次排队更长时间。

+0

我已经回复了您的评论,请告诉我,如果您有任何建议。 –

+0

我想知道是否将'batch.size'设置为0(或1和标准值之间的值)是否可以更好地解决问题? –

+0

Hi @JamesThomas,“表示你正在以比他们可以发送更快的速度排队记录”,如果我根本不想排队呢?我们的生产环境中会有大量流量,我们希望尽快发送数据。我们不希望看到这个过期。我们已将linger.ms设置为默认值,但仍然出现此问题。 你说增加request.timeout.ms会增加批量范围。 –

3

发送给代理之前控制时间的参数是linger.ms。其默认值是0(无延迟)。

+0

从原始问题可能不清楚我的情况发生了什么例如,我试图提供更多细节以使其更清晰。有关完整信息,请参阅下面的注释。这是一个问题,我排队记录比我的上传带宽更快。 –

22

我在完全不同的上下文中得到了这个异常。

我已经设置了zookeeper vm,broker vm和producer/consumer vm的小型集群。我打开了服务器(9092)和动物园管理员(2181)上的所有必需端口,然后尝试从消费者/发布者vm向代理发布消息。我得到了OP提到的异常,但由于我迄今为止只发布了一条消息(或者至少我尝试过),所以解决方案不能增加超时或批量大小。所以我搜索了一下,发现这个邮件列表描述了我在尝试使用消费者/生产者vm(ClosedChannelException)中的消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 此邮件列表中的最后一篇文章实际上描述了如何解决该问题。

长话短说,如果你面对两个ChannelClosedExceptionBatch Expired例外,你可能需要更改此行的server.config文件中的以下并重新启动代理:

advertised.host.name=<broker public IP address> 

如果不是那么它会回落到host.name属性(它可能不会被设置),然后回退到Java类的规范主机名,最终当然不正确,因此会导致远程节点混淆。

+4

我可以证实这位先生的回答 – x4k3p

+0

听众= PLAINTEXT://域名:9092端口= 9092 host.name =本地主机 advertised.host.name = domain_name.i可以在本地发送消息,但是当我让卡夫卡服务器生效时!即时获取此异常“org.apache.kafka.common.errors.TimeoutException:批量过期 java.util.concurrent.ExecutionException”。我在哪里出错 –

+0

如果其他人正在查看,配置文件位于'/config/server.properties'中用于'0.10.2.1'。 – Edd

-1

当您创建使用者设置ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG为true时。

1

我正在使用Kafka Java客户端版本0.11.0.0。我也开始看到没有一致地产生大量消息的相同模式。它传递了很少的信息,并失败了一些其他人。 (虽然通过和失败的邮件都是相同的大小)。在我的情况下,每封邮件大小约60KB,这远高于卡夫卡的默认batch.size 16kB,我的linger.ms被设置为默认值0.此错误正在因为生产者客户端在它能够从服务器接收到成功的响应之前超时。基本上,在我的代码中,这个呼叫超时:kafkaProd.send(pr).get()。为了解决这个问题,我不得不将生产者客户端的默认request.timeout.ms增加到60000

相关问题