2017-06-22 56 views
1

本主题应仅包含每个X的最新'文档​​X更新'事件。但我无法正确配置主题并保留多个副本。如何配置用作快照存储的卡夫卡主题

我的想法是保持细分,以及所有相关超时,清空和保留时间。

主题设置(我没有哪里有什么前缀的每个选项和一个足够清晰的认识应用,所以有可能是一些未使用的和无关的内容以及夸张的数字 - 更正欢迎):

"cleanup.policy"     -> "compact", 
"file.delete.delay.ms"    -> "10", 
"segment.bytes"      -> "10000", 
"delete.retention.ms"    -> "10", 
"retention.bytes"     -> "10000", 
"segment.ms"      -> "10", 
"retention.ms"      -> "10", 
"min.cleanable.dirty.ratio"   -> "0.001", 
"flush.messages"     -> "1", 
"flush.ms"       -> "10", 
"min.compaction.lag.ms"    -> "1", 
"log.cleaner.min.compaction.lag.ms" -> "1" 

我喂话题akka-streams-kafka

val ids = List("12345", ...) 

val publish: Future[Done] = Source(ids ++ ids ++ ids ++ ids ++ ids) 
    .map { id => 
    ProducerMessage.Message(new ProducerRecord[String, String](topic, id, id), id) 
    } 
    .via(producerFlow) 
    .map(logResult) 
    .runWith(Sink.ignore) 
Await.result(publish, 3.seconds) 

等待几秒钟后,我算的消息:

var count = 0 
val runCount = Consumer 
    .plainSource(consumerSettings, Subscriptions.topics(topic)) 
    .map { t => 
    count += 1 
    t 
    } 
    .runWith(Sink.ignore) 
Try { Await.result(runCount, timeout) } 

我希望消费者能够收到ids.length消息,但它总是会在第一次运行时接收所有生成的消息,而在后续运行时会收到更多消息。

确实发生了一些压缩 - 如果我多次运行测试,消耗的消息数停止增长,并且我看到kafka日志中的段删除 - 但每个密钥仍有多个消息。

如何将卡夫卡主题用作快照存储?

使用kafka 0.10.2.1

谢谢。

+0

我需要一些细节来调查:1.请问您可以提供经纪人日志,2.经纪人级别的配置将有所帮助。 –

+0

@SudheshRajan当然,这里是[重复测试执行过程中来自broker.log的行](https://gist.github.com/ksilin/095353de745ce8707d6150eae6796c18),这里是[server.properties](https:// gist .github.com/ksilin/415964ec885d5e7c695986046c04c65b)。 'server.properties'是香草。更多我可以提供的信息? – kostja

回答

1

根据卡夫卡规格:“日志压实确保卡夫卡将始终保留至少用于单个主题分区的记录数据中的每个消息密钥的最后已知值”。即Kafka并不保证每个密钥只保留一条消息,但它保证每个密钥始终具有最新的消息版本。

+0

日志压缩似乎是要走的路 – PragmaticProgrammer

+0

这是每个主题分区的最新部分,它保持打开供写入,并且没有压缩(直到它翻转并且新的分段文件成为写入活动部分)。否则,所有其他段在压缩周期结束后应该只有每个键的一条消息。 –

0

您可以尝试解决配置问题以查看是否可以实现您想要的操作(请参阅this),但我建议在应用程序级别处理它,仅将最新消息与该密钥一起用作有效消息,因为日志压缩运行在一个单独的线程上,每次更新后都无法触发它(即使有办法,效率也不会很高)。