2017-07-31 157 views
0

汇合的高级消费者here具有以下代码(为了简洁起见而修剪)。Confluent .net(rdkafka)提交消费者处置

 using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8))) 
     { 
      while (!cancelled) 
      { 
       Message<Null, string> msg; 
       if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100))) 
       { 
        continue; 
       }      

       if (msg.Offset % 5 == 0) 
       {       
        consumer.CommitAsync(msg).Result;       
       } 
      } 
     } 

自动提交是错误的。 我的问题是,如果'已取消'触发器被标记,但还有未完成的提交时会发生什么。这些消息是否未提交,因此会再次收到?我希望消费者会承诺处置,但在实施过程中我看不到这种情况。我可以做一些测试来看看会发生什么,但是我希望得到一个'官方'的答案,以防我的测试不能涵盖所有情况。

+0

与实际问题无关,但:它是否是正确的实施?没有内存隔阂,确保'取消= true;'是否被其他线程观察过? – zerkms

+0

让我们说'取消'是挥发性的 – acarlon

回答

1

首先请注意,只有在enable.auto.commit设置为false(这是Advanced Consumer示例中的情况 - 这需要更多评论和wiki输入)时,才应该使用CommitAsync。如果使用的是自动提交(默认),废弃时,您将自动提交(这是当调用rd_kafka_destroy librdkafka侧)

在这里,我们正在等待CommitAsync完成(通过.Result),这就是所谓的与while循环相同的线程,所以不会有“在飞行中”提交请求。

如果您使用手动提交(在完整的代码片段中就是这种情况),您必须在处理之前手动提交 - 实例中确实缺少,将添加它。如上所述,在手动提交中,您必须自己承诺(有些情况下用户在处置时不会提交)

我还发现了一个错误,如果您调用CommitAsync但不要等到它发生之前(https://github.com/confluentinc/confluent-kafka-dotnet/issues/279)因此,您应该始终等待CommitAsync在临时解除之前完成

+0

+ 1的处置提示有关commitAsync。但我的问题依然存在 - 未完成的承诺是否会被处置?从我的测试到目前为止,我认为答案是否定的。 – acarlon

+0

当enable.auto.commit为True时,consumer会对dispose进行提交,但在我的情况下,它仍然卡住。最终(70分钟后)超时:Confluent.Kafka.KafkaException:本地:超时 at Confluent.Kafka.Impl.SafeKafkaHandle.ConsumerClose() –

+0

如果使用自动提交,则会执行。如果指定手动提交,则必须在处理之前调用并等待CommitAsync完成。如果你不等待它,行为是不可预测的(可能会提交,可能不会,并且可能会抛出异常)。如果不清楚,请详细说明你的意思是“优秀承诺 @SasaNinkovic我很想知道更多,你可以在这里详细说明你的配置或发布在github上的问题?谢谢! – Treziac