2017-06-23 155 views
2

我正在使用kafka流,并试图将KTable实现为主题。Apache Kafka Streams将KTables实现为主题似乎很慢

它有效,但似乎每隔30秒左右完成一次。

Kafka Stream如何决定将KTable的当前状态转化为主题?

有什么办法缩短这个时间,让它更“实时”吗?

这里是我使用

// Stream of random ints: (1,1) -> (6,6) -> (3,3) 
// one record every 500ms 
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC); 

// grouping by key 
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer()); 

// same behaviour with or without the TimeWindow 
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total"); 

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC); 
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC); 
+0

能帮你更清楚你要完成什么?一些代码?你的意思是你正在做类似于:ktable.to(“topic_name”)? –

+0

是的,这正是我在做的 –

+1

好吧,我理解你的问题,但不幸的是,我从来没有调整过这个特定的案例(我实际上需要类似的东西,因为我们正在实施一个解决方案,要求这是非常快速/最新),说我会开始玩可用配置:http://kafka.apache.org/documentation/#configuration可能首先与以下内容: 经纪人级别:log.flush。* ones,offset .commit.timeout.ms, 制作者超时。 话题等级:flush。* ones 流级别:提交* ones 如果您发现解决方案,请在此处发帖,它会很有用 –

回答

2

这是由commit.interval.ms控制,默认为30秒的实际代码。更多的细节在这里: http://docs.confluent.io/current/streams/developer-guide.html

缓存的语义是数据刷新到状态存储和转发给下一个下游处理器节点每当最早commit.interval.ms或cache.max.bytes.buffering的(缓存压力)命中。

这里:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams

+0

超级谢谢Michal –