2017-02-19 99 views
2

当源主题分区count = 1时正常工作。如果我将分区上升到大于1的任何值,我会看到下面的错误。适用于低级别以及DSL API。任何指针?有什么可能会丢失?无法重新平衡具有多个主题分区的Kafka Streams中的错误

org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store in-memory-avg-store's change log (cpu-streamz-in-memory-avg-store-changelog) does not contain partition 1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) 
     at org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier$MemoryStore.init(InMemoryKeyValueStoreSupplier.java:102) 
     at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56) 
     at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) 
     at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) 
     at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119) 
     at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) 
     at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) 
     at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) 
     at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) 

回答

3

这是一个操作问题。 Kafka Streams不允许在其“使用期限”期间更改输入主题分区的数量。

如果您停止正在运行的Kafka Streams应用程序,请更改输入主题分区的数量,然后重新启动您的应用程序将会中断(出现上述错误)。修复此问题对于生产用例是非常棘手的,因此强烈建议而不是更改输入主题分区的数量(请参阅下面的注释)。对于POC /演示,虽然不难解决。

为了解决这个问题,你应该使用卡夫卡的应用程序重置工具重置您的应用程序:

使用应用程序重置工具,有你擦的缺点出你的整个应用程序状态。因此,为了使您的应用程序进入与之前相同的状态,您需要从头开始重新处理整个输入主题。当然,只有当所有输入数据仍然可用并且没有任何内容由经纪人应用主题保留时间/大小策略时才被删除。

此外,您应该注意,向输入主题添加分区会更改主题的分区模式(通过键为默认的基于散列的分区)。由于Kafka Streams假设输入主题按键正确分区,所以如果使用重置工具并重新处理所有数据,则可能会得到错误的结果,因为“旧”数据的分区与“新”数据的分区不同(即添加新分区)。对于生产用例,您需要读取原始主题中的所有数据,并将其写入新主题(增加分区数)以正确分区数据(或者当然,此步骤可能会更改不同记录的排序钥匙 - 通常不应该是个问题 - 只是想提到它)。之后,您可以将新主题用作Streams应用程序的输入主题。

这个重新分区的步骤也可以在您通过使用运算符through("new_topic_with_more_partitions")直接读取原始主题并在进行任何实际处理之前在您的Streams应用程序中轻松完成。

但是,通常情况下,建议将生产用例的主题进行分区,以便以后不再需要更改分区数量。过度分区的开销很小,以后可以节省很多麻烦。如果您使用Kafka,这是一个一般性建议 - 它不仅限于Streams用例。

还有一个备注:

有些人可能会建议增加卡夫卡的分区数手动行旅内部主题。首先,这将是一个黑客不建议由于某些原因。

  1. 因为它取决于各种因素(因为它是一个Stream的内部实现细节),因此找出正确的数字是非常棘手的。
  2. 您还将面临打破分区方案的问题,如上段所述。因此,您的应用程序很可能以不一致的状态结束。

为了避免不一致的应用程序状态,Streams不会自动删除任何内部主题或更改内部主题分区的数量,但会失败并显示您报告的错误消息。这可以确保用户通过手动执行“清理”来了解所有含义。

BTW:对于即将到来的卡夫卡0.10.2此错误消息得到了改进:https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L100-L103

相关问题