2017-12-27 1281 views
0

我有一个应用程序需要监听多个不同的主题;每个主题都有独立的消息处理逻辑。我曾经想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到如下所示的错误。Kafka Streams:使用相同的`application.id`来消费多个主题

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic 

代码(科特林)

class KafkaSetup() { 
    companion object { 
     private val LOG = LoggerFactory.getLogger(this::class.java) 
    } 

    fun getProperties(): Properties { 
     val properties = Properties() 
     properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app") 
     return properties 
    } 

    private fun listenOnMyTopic() { 
     val kStreamBuilder = KStreamBuilder() 
     val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic") 

     kStream.foreach { key, value -> LOG.info("do stuff") } 

     val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) 
     kafkaStreams.start() 
    } 

    private fun listenOnMyOtherTopic() { 
     val kStreamBuilder = KStreamBuilder() 
     val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic") 

     kStream.foreach { key, value -> LOG.info("do other stuff") } 

     val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) 
     kafkaStreams.start() 
    } 
} 

,我发现这个reference那建议你不能使用application.id多个主题,但我发现很难找到参考文件来支持。 documentation对于application.id指出:

流处理应用的标识符。在Kafka集群中必须是唯一的。它用作1)默认客户端ID前缀,2)用于成员资格管理的组ID,3)变更日志主题前缀。

问题

  1. 这个错误是什么意思,什么原因造成的。
  2. 鉴于您可以使用多个主题分区使用同一个ID运行的多个应用程序实例,“在Kafka集群中必须是唯一的”是什么意思?
  3. 你可以使用相同的卡夫卡流application.id开始两个KafkaStreams列在不同的主题?如果是这样,怎么样?

详情:卡夫卡0.11.0.2

回答

2

卡夫卡流通过分区,而不是主题缩放。因此,如果您使用相同的application.id启动多个应用程序,则它们必须与它们所订阅的输入主题及其处理逻辑相同。该应用程序使用application.id作为group.id形成消费者组,因此输入主题的不同分区被分配给不同的实例。

如果您有与相同逻辑不同的主题,你可以订阅所有话题一次(在每个实例启动)。虽然缩放仍然基于分区。 (它基本上是输入主题的“合并”。)

如果要通过主题缩放和/或具有不同的处理逻辑,则必须针对不同的Kafka Streams应用程序使用不同的application.id

+0

谢谢。是否有任何文件声明_“具有相同application.id的多个应用程序,它们必须与输入主题相同”_? –

+0

我不确定。请注意,AK支持Streams for v1。0重新atm重新atm - 评论/建议欢迎通过邮件列表。 –

相关问题