2017-04-20 75 views
2

我想用SinkTask保存数据时保证写入顺序。发生RetriableException时,Kafka Connect能否保证写入顺序?

如果我想在我的SinkTask.put()抛出RetriableException,将Kafka Connect写入到数据源无序比分区的顺序?

例如,如果在一个分区中的消息是1-2-3,如果写入消息2,可卡夫卡连接保证邮件到达的数据源是1-2-3期间发生异常?

据我所知,卡夫卡连接异步写入数据源。所以看起来好像数据将不按顺序到达数据源。

回答

1

简短的回答:是的,消息的发布顺序将被保留,但是你必须要处理的消息重新交付。

在您的例子,这意味着,如果SinkTask.put尝试传递到您的水槽下面的批处理的消息:1,2,3和1写入后并通过投掷RetriableException写入2之前失败,连接将暂停消费,并会尝试重新呼叫转交给SinkTask.put过程中失败的批次。这给了我们上述的两种效果:

一)连接将暂停消费者对这一任务/分区。这意味着在重试失败之前不会传送其他批消息。因此,消息顺序被保留。例如。如果传递1,2,3失败,RetriableException,Connect在传递1,2,3之前不会传递4,5,6。

b)连接将重试以传递在期间失败的整个消息集SinkTask.put。这意味着您的接收器会在再次尝试写入消息2之前再次看到消息1。

+0

完美。虽然,我不太明白你的意思*“你将不得不处理重新传送的消息”*因为你提到Connect会自动传送消息。这是否意味着正在写入的数据源必须处理在发生故障时正在写入的重复消息?例如,在你的例子中,数据源需要处理消息1再次被正确写入? – Glide

+1

对。我以你最初的例子为基础。您提到_在编写Message-2_期间发生异常,意味着msg1被Sink正确“处理”(例如它被写入文件)。当从你的接收器中抛出一个_RetriableException_时,这意味着你有办法重新处理批处理(你的接收器是幂等的),或者你不关心重复项。在一个文件的例子中,这意味着你有一种方法来重新附加部分用第1条消息写入的文件,以包含现在的消息1,2和3或解析重复项。连接重新输送整批。 –

+0

感谢您对卡夫卡的有益回应和贡献! – Glide

相关问题