2017-02-17 202 views
3

我有一个Kafka Streams(0.10.1.1)的问题。我正在尝试在同一主题上创建一个KStream和一个KTableKafka Streams - 获得KTable和KStream相同主题的最佳方式?

我试过的第一种方法是简单地调用KStreamBuilder方法来处理同一主题上的流和表。这导致

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source. 

好的,这似乎是Kafka Streams内置的一些限制。

我的第二种方法是最初创建一个KTable并使用其上的toStream()方法。这有KTables做一些内部缓冲/冲洗的问题,所以如果我的例子中键多次出现,输出流不会反映所有输入元素。这是一个问题,因为我正在计算密钥的出现次数。

似乎工作的方法是最初创建一个KStream,按键对它进行分组,然后通过丢弃旧聚合并仅保留新值来“减少”它。我对这种方法并不满意,因为a)它看起来非常复杂,并且b)接口没有指定哪个是已经聚集的值,哪个是新的。我按照惯例去了第二个,但是...... meh。

所以问题归结为:有没有更好的方法?我错过了一些非常明显的东西吗?

请记住,我没有正确使用用例 - 这只是我开始了解Streams-API。

回答

3

关于添加主题两次:这是不可能的,因为Kafka Streams应用程序是单个“消费者组”,因此一次只能提交一个主题的偏移量,而两次添加主题则表明该主题获取的消费者两次(和独立进展)。

对于方法KTable#toString(),可以通过StreamsConfig参数cache.max.bytes.buffering == 0禁用高速缓存。但是,这是全局设置,并禁用所有KTable s的缓存/重复数据删除(请参阅http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。

groupBy方法也适用,即使它需要一些样板。我们正在考虑在API中添加KStream#toTable()以简化此转换。是的,reduce中的第二个参数是新值 - 由于reduce是用于合并两个值,所以API没有“old”和“new”的概念,因此参数没有这样的命名。

+0

为什么不能重复使用以前添加的主题? – kvatashydze

+0

正如答案中所述。因为我们使用单个消费者,而消费者只能订阅一个主题。 –

相关问题