我正在使用kafka流,并试图将KTable实现为主题。Apache Kafka Streams将KTables实现为主题似乎很慢
它有效,但似乎每隔30秒左右完成一次。
Kafka Stream如何决定将KTable的当前状态转化为主题?
有什么办法缩短这个时间,让它更“实时”吗?
这里是我使用
// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);
// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());
// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");
// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
能帮你更清楚你要完成什么?一些代码?你的意思是你正在做类似于:ktable.to(“topic_name”)? –
是的,这正是我在做的 –
好吧,我理解你的问题,但不幸的是,我从来没有调整过这个特定的案例(我实际上需要类似的东西,因为我们正在实施一个解决方案,要求这是非常快速/最新),说我会开始玩可用配置:http://kafka.apache.org/documentation/#configuration可能首先与以下内容: 经纪人级别:log.flush。* ones,offset .commit.timeout.ms, 制作者超时。 话题等级:flush。* ones 流级别:提交* ones 如果您发现解决方案,请在此处发帖,它会很有用 –