2014-10-28 61 views
0

我在那里,我在斯卡拉实施卡夫卡生产者客户一个非常奇怪的问题,而当卡夫卡群集了一切工作正常。也就是说,制作人将数据正确推入卡夫卡。但是,如果我将我的制作者设置为“异步”,则Kafka Producer客户端将正确放置数据,如果群集已启动且正在工作,但生产者还会将数据推入...无效...如果集群已关闭!也就是说,如果群集关闭,Kafka Producer不会返回错误。卡夫卡0.8.1 Scala的异步生产者 - 无法检测卡夫卡群集关闭

这对我来说很关键,因为当生产者发现要推入卡夫卡的数据时,它会删除该数据源...因此,如果集群出现故障,生产者只会销毁所有源数据。

如果生产者被设置成“同步”模式然而,生产者并不正确默认3重试之后失效,根据所述属性(我认为message.send.max.retries)。

这里有什么想法吗?

回答

1

这是异步生产者的记录的行为。如果您确实在乎是否将消息发送给代理,请使用sync-producer。或者,更好的是(尽管尚未完整记录),使用Kafka 0.8.2-beta和新的生产者 - 这可以让你异步地发送消息,但检查返回的“未来”对象的状态。