2017-08-02 119 views
1

我使用卡夫卡版本0.10.2.1春季启动我的项目。卡夫卡重复读

我具有可以通过多个消费者消耗一个话题的5个分区(具有相同组ID),这些不同的机器上运行。

我面对什么问题是

我收到重复这些卡夫卡警告日志读取单个消息的

Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

由于日志表明,这个问题是因为卡夫卡消费未能提交。

这里是我的用例几个细节:

  • 我有多个消费者属于同一个话题My-Topic的组ID my-consumer-group

  • 消费者从卡夫卡使用消息,应用业务逻辑并存储处理的数据Cassandra

  • 消费来自卡夫卡的消息的过程,应用业务逻辑,然后将其保存到Cassandra 需要约10毫秒消费从卡夫卡消息。

我使用下面的代码创建卡夫卡消费者豆

@Configuration 
@EnableKafka 
public class KafkaConsumer { 
    @Value("${spring.kafka.bootstrap-servers}") 
    private String brokerURL; 

    @Value("${spring.kafka.session.timeout}") 
    private int sessionTimeout; 

    @Value("${spring.kafka.consumer.my-group-id}") 
    private String groupId; 

    @Value("${spring.kafka.listener.concurrency}") 
    private int concurrency; 

    @Value("${spring.kafka.listener.poll-timeout}") 
    private int timeout; 

    @Value("${spring.kafka.consumer.enable-auto-commit}") 
    private boolean autoCommit; 

    @Value("${spring.kafka.consumer.auto-commit-interval}") 
    private String autoCommitInterval; 

    @Value("${spring.kafka.consumer.auto-offset-reset}") 
    private String autoOffsetReset; 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(concurrency); 
     factory.getContainerProperties().setPollTimeout(timeout); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); 
     propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
     propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); 
     return propsMap; 
    } 
} 

这些都是我使用

spring.kafka.listener.concurrency=2 
spring.kafka.listener.poll-timeout=3000 
spring.kafka.consumer.auto-commit-interval=1000 
spring.kafka.consumer.enable-auto-commit=true 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.session.timeout=50000 
spring.kafka.connection.timeout=10000 
spring.kafka.topic.partition=5 
spring.kafka.message.replication=2 

我最关心的卡夫卡配置是重复读由属于同一消费群体的多个卡夫卡消费者发出的消息以及我的应用程序中,我必须避免重复输入数据库。

请帮我看看我的上面的卡夫卡配置和卡夫卡消费者代码,以便我可以避免重复阅读。

回答

0

简单的答案是不要使用autoCommit - 它按照时间表提交。

取而代之,让容器做提交;使用AckModeRECORD

但是你仍然应该使你的代码幂等 - 总是有可能重新传递;只是在更可靠的提交策略下,概率会更小。

+0

问题是我有一个卡桑德拉柜台专栏,根据卡夫卡消费者收到的信息得到递增。 如果发生重复读取,它将不止一次增加计数器,这将导致错误的分析。 –

+0

欢迎来到消息传递的世界。对于你的情况,“恰好一次”交付是不可能实现的(如果你不相信我的话,它就是谷歌)。正如我所说的,您可以最小化但不能消除重复交付的可能性。考虑更新mongodb然后服务器在提交kafka偏移量之前崩溃的情况;结果 - 重新交付。如果它很重要,你必须首先检查mongo,看看你是否已经存储了这个事件。 –

+0

谢谢@加里。你的回答真的帮了我。如果只有一次“交付是不可能的,那么银行和关键任务系统的工作方式我知道他们使用RDBMS,但他们使用什么消息工具 –