2017-04-14 290 views
4

我很新的kafka(也英语......),我面临这个问题,并不能谷歌任何解决方案。kafka停止消费从新分配后的新分配的分区

我我的本地机器上使用弹簧启动,弹簧卡夫卡的支持,我已经安装了kafka_2.11-0.10.1.1(只有一个经纪人0)

s1.then我通过

制造话题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic tracking 

我的消费配置: applitions.properties:

kafka.servers.bootstrap=localhost:9092 
kafka.topic.tracking=tracking 
kafka.group.id=trackingGroup 
kafka.client.id=client-1 

S2。然后,我通过更改'kafka.client.id'启动3位用户并运行spring-boot main class。在eclipse控制台上,我可以检查分区分配:

client-1: partitions assigned:[tracking-4, tracking-3] 
client-2: partitions assigned:[tracking-2, tracking-1] 
client-3: partitions assigned:[tracking-0] 

s3。启动pruducer向主题发送20条消息,每个消息开始消费特定分区

s4。我关闭消耗1,卡夫卡自动进行再平衡, 新的分区分配:

client-1: partitions assigned:[] 
client-2: partitions assigned:[tracking-2,tracking-1, tracking-0] 
client-3: partitions assigned:[tracking-4,tracking-3] 

S5。我发现分区'tracking-3'上的消息没有消耗!

问题可以每次重现,在新分配的分区丢失一些消息损失,你可以任何suggesstions?请帮助我,谢谢

+0

我不知道你关闭客户端1点什么。 client-1在关闭之前可能已经消耗了跟踪-3的所有消息。关闭客户端1后,您是否尝试向主题发送更多消息? – yaswanth

+0

我按ctrl + c关闭客户端-1,客户端-1没有消耗跟踪-3中的所有消息,我尝试了多次消息。但仍然丢失消息 –

回答

3

我转载了它;在重新平衡中,kafka本身(auto.comit.enabled=true)看起来像是一个问题,kafka报告未分区(the offset of the <i>next record</i> that will be fetched (if a record with that offset exists))的“位置”作为分区的结束。

实际上,当我使用kafka-consumer-groups工具时,未读分区的偏移量已经在“结束”。当我只用一个消费者运行它时,它正在读取第一个分区,我看到...

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe -group so43405009 

TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG  CONSUMER-ID          HOST       CLIENT-ID 
tracking      0   37    40    3   client1-8129bb3d-3a83-4c83-9128-3a2762ede758  /10.0.0.6      client1 
tracking      1   40    40    0   client1-8129bb3d-3a83-4c83-9128-3a2762ede758  /10.0.0.6      client1 
tracking      2   40    40    0   client1-8129bb3d-3a83-4c83-9128-3a2762ede758  /10.0.0.6      client1 
tracking      3   40    40    0   client1-8129bb3d-3a83-4c83-9128-3a2762ede758  /10.0.0.6      client1 
tracking      4   40    40    0   client1-8129bb3d-3a83-4c83-9128-3a2762ede758  /10.0.0.6      client1 

注意CURRENT_OFFSET列。

在接下来的运行,我跑了两次,一次是在正在处理的第一个分区,再晚了一点......

TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG  CONSUMER-ID          HOST       CLIENT-ID 
tracking      0   41    44    3   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      1   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      2   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      3   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      4   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 

TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG  CONSUMER-ID          HOST       CLIENT-ID 
tracking      0   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      1   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      2   41    44    3   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      3   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 
tracking      4   44    44    0   client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8  /10.0.0.6      client1 

如何看当前分区2的偏移下降44〜41

禁用自动提交解决了这个问题对我来说...

spring.kafka.consumer.enable-auto-commit=false 

...

TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG  CONSUMER-ID          HOST       CLIENT-ID 
tracking      0   52    52    0   client1-59413599-81e8-49dd-bbd7-8a62152f11e5  /10.0.0.6      client1 
tracking      1   49    52    3   client1-59413599-81e8-49dd-bbd7-8a62152f11e5  /10.0.0.6      client1 
tracking      2   49    52    3   client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42  /10.0.0.6      client2 
tracking      3   48    52    4   client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42  /10.0.0.6      client2 
tracking      4   51    52    1   client3-20da8742-af38-403e-b125-5d0c7c771319  /10.0.0.6      client3 

这里是我的测试程序:

@SpringBootApplication 
public class So43405009Application implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(So43405009Application.class, args); 
    } 

    @Autowired 
    private KafkaTemplate<String, String> template; 

    @Value("${spring.kafka.consumer.client-id}") 
    private String clientId; 

    @Override 
    public void run(String... args) throws Exception { 
     if (this.clientId.endsWith("1")) { 
      for (int i = 0; i < 20; i++) { 
       this.template.sendDefault("foo" + i); 
      } 
     } 
    } 

    @Bean 
    public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) { 
     ContainerProperties containerProperties = new ContainerProperties("tracking"); 
     containerProperties.setMessageListener((MessageListener<?, ?>) d -> { 
      System.out.println(d); 
      try { 
       Thread.sleep(5_000); 
      } 
      catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
     }); 
     KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(cf, 
       containerProperties); 
     return container; 
    } 

} 

与性能

spring.kafka.listener.ack-mode=record 
spring.kafka.consumer.enable-auto-commit=false 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.consumer.group-id=so43405009 
spring.kafka.consumer.client-id=client1 
spring.kafka.template.default-topic=tracking 

我看到0.10.2.0相同的结果也是如此。

EDIT

它原来是一个弹簧卡夫卡错误;它的工作原理与自动提交启用,但你必须明确地启用它

spring.kafka.consumer.enable-auto-commit=true 

否则容器假定它是false并导致上述奇怪的行为 - 看起来像客户端不喜欢打电话消费者如果自动提交方法-commit已启用。 #288

我一般会建议设置为false,并选择容器的AckMode!而非一个;例如RECORD提交之后提交记录,BATCH之后通过投票收到每个批次(默认)。

+0

有关根本原因和解决方法的更多详细信息,请参阅编辑我的答案。 –

+0

真的很感谢你加里,U帮了我很多! –

+0

我想跟着你的建议----“设置为false,并选择容器的AckModes之一代替;!如记录提交后不断记录,批后,每通过轮询收到的批次(默认)“。但可以ü请给一些示例代码或链接(使用弹簧卡夫卡),因为我真正的新卡夫卡和官方弹簧卡夫卡文档似乎不太足够。 –