本主题应仅包含每个X的最新'文档X更新'事件。但我无法正确配置主题并保留多个副本。如何配置用作快照存储的卡夫卡主题
我的想法是保持细分,以及所有相关超时,清空和保留时间。
主题设置(我没有哪里有什么前缀的每个选项和一个足够清晰的认识应用,所以有可能是一些未使用的和无关的内容以及夸张的数字 - 更正欢迎):
"cleanup.policy" -> "compact",
"file.delete.delay.ms" -> "10",
"segment.bytes" -> "10000",
"delete.retention.ms" -> "10",
"retention.bytes" -> "10000",
"segment.ms" -> "10",
"retention.ms" -> "10",
"min.cleanable.dirty.ratio" -> "0.001",
"flush.messages" -> "1",
"flush.ms" -> "10",
"min.compaction.lag.ms" -> "1",
"log.cleaner.min.compaction.lag.ms" -> "1"
我喂话题akka-streams-kafka:
val ids = List("12345", ...)
val publish: Future[Done] = Source(ids ++ ids ++ ids ++ ids ++ ids)
.map { id =>
ProducerMessage.Message(new ProducerRecord[String, String](topic, id, id), id)
}
.via(producerFlow)
.map(logResult)
.runWith(Sink.ignore)
Await.result(publish, 3.seconds)
等待几秒钟后,我算的消息:
var count = 0
val runCount = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { t =>
count += 1
t
}
.runWith(Sink.ignore)
Try { Await.result(runCount, timeout) }
我希望消费者能够收到ids.length
消息,但它总是会在第一次运行时接收所有生成的消息,而在后续运行时会收到更多消息。
确实发生了一些压缩 - 如果我多次运行测试,消耗的消息数停止增长,并且我看到kafka日志中的段删除 - 但每个密钥仍有多个消息。
如何将卡夫卡主题用作快照存储?
使用kafka 0.10.2.1
谢谢。
我需要一些细节来调查:1.请问您可以提供经纪人日志,2.经纪人级别的配置将有所帮助。 –
@SudheshRajan当然,这里是[重复测试执行过程中来自broker.log的行](https://gist.github.com/ksilin/095353de745ce8707d6150eae6796c18),这里是[server.properties](https:// gist .github.com/ksilin/415964ec885d5e7c695986046c04c65b)。 'server.properties'是香草。更多我可以提供的信息? – kostja