2013-08-12 27 views
4

我正在运行0.8 Kafka,并使用提供的Java API构建生产者。
发送消息(或消息)的API函数返回void。Kafka - producer - handle“failed to send”

有没有办法获得发送消息的状态?如果它发送或失败?

这对我们来说非常重要,因为我们正在阅读来自文件的消息,并且我们希望在发送所有消息后删除文件。但是如果出现错误并且一些消息没有发送,我删除该文件会导致丢失非常重要的数据。

+0

只是猜测,但如果发送没有成功,可能会引发异常?所以如果你发现这个例外,你不会删除你的文件。 – asmaier

+0

“发送”函数是aSync(在不同线程稍后发生实际发送时立即返回),因此不会抛出异常。 –

回答

2

您可以配置您的生产者等待,直到它从Kafka集群获取数据(request.required.acks),以便在删除源文件之前确保数据已正确提交。

如果确实您需要确保消息发送成功,您可能需要考虑使生产者同步的选择(producer.type = sync)。这样,您将能够捕获阻塞调用引发的任何异常并相应地执行操作。 send()抛出的异常是kafka.common.FailedToSendMessageException。

卡夫卡的Java API并不理想,希望这对你有所帮助。

+0

Yeap,我把它留在那里,最近回到它,你是对的。 'type = sync'是确保消息被发送的唯一方式,但是这决定了地狱般的表现。 我们正在寻找不同的队列。 –

相关问题