我有一个Kafka Streams(0.10.1.1)的问题。我正在尝试在同一主题上创建一个KStream
和一个KTable
。Kafka Streams - 获得KTable和KStream相同主题的最佳方式?
我试过的第一种方法是简单地调用KStreamBuilder
方法来处理同一主题上的流和表。这导致
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.
好的,这似乎是Kafka Streams内置的一些限制。
我的第二种方法是最初创建一个KTable
并使用其上的toStream()
方法。这有KTables
做一些内部缓冲/冲洗的问题,所以如果我的例子中键多次出现,输出流不会反映所有输入元素。这是一个问题,因为我正在计算密钥的出现次数。
似乎工作的方法是最初创建一个KStream
,按键对它进行分组,然后通过丢弃旧聚合并仅保留新值来“减少”它。我对这种方法并不满意,因为a)它看起来非常复杂,并且b)接口没有指定哪个是已经聚集的值,哪个是新的。我按照惯例去了第二个,但是...... meh。
所以问题归结为:有没有更好的方法?我错过了一些非常明显的东西吗?
请记住,我没有正确使用用例 - 这只是我开始了解Streams-API。
为什么不能重复使用以前添加的主题? – kvatashydze
正如答案中所述。因为我们使用单个消费者,而消费者只能订阅一个主题。 –