2017-01-03 61 views
1

我使用合流的kafka连接器3.0.1版本。我创建了一个名为的新组,,其中有大约20个主题。这些主题中的大多数都很忙。但它可惜的是,当我启动连接器框架时,系统无法停止重新平衡,大约2分钟后所有主题的重新平衡。我不知道原因。 一些错误消息的是:合流的Kafka连接器 - 无法停止重新平衡

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180) 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) 
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
: 

我不知道是否有什么关系不断重新平衡。

我知道,如果KafkaConsumer.poll()比配置的超时时间长,卡夫卡将撤消分区,因此重新平衡被触发,但我确信每次轮询都不是那么长。 有人可以给我一些线索吗?

回答

1

我觉得max.poll.records可以解决this.It是格格不入的,但必须在每次循环处理记录数。在0.10中有max.poll.records,这对每次调用返回的记录数量都有一个上限。

还如每汇合,consumer.poll()应具有例如30相当高的会话超时为60秒。

您可能还需要微调:

session.timeout.ms 
heartbeat.interval.ms 
max.partition.fetch.bytes 
+0

是的,当我花太多把轮询结果放到hdfs中,然后重新平衡。我优化了我的代码,重新平衡变得很少见。 – wuchang

0

考虑升级到0.10.1或更高,因为消费者在这些版本增强,更好地处理电话之间更长的时间来轮询()。

如果你正在服用超过5分钟,把结果放到HDFS可以增加新的max.poll.interval.ms参数。这将阻止您的消费者被淘汰出消费者群体而无法取得进展。

在0.10.1版本说明它说

新的Java消费者现在可以支持从后台线程 心跳。有哪些 控制轮询调用之间的最大时间消费者 之前将主动离开组(5分钟默认情况下)的新配置max.poll.interval.ms。值 配置request.timeout.ms必须始终大于 max.poll.interval.ms,因为这是JoinGroup 请求可以在服务器上阻止消费者重新平衡时的最长时间,因此我们有 将其默认值更改为仅5分钟以上。最后, session.timeout.ms的默认值已调低至10 秒,max.poll.records的默认值已更改为500