我的代码做更多的少这样的设置:卡夫卡流中线程分配的策略是什么?
// loop over the inTopicName(s) {
KStream<String, String> stringInput = kBuilder.stream(STRING_SERDE, STRING_SERDE, inTopicName);
stringInput.filter(streamFilter::passOrFilterMessages).map(processor_i).to(outTopicName);
// } end of loop
streams = new KafkaStreams(kBuilder, streamsConfig);
streams.cleanUp();
streams.start();
如果有例如num.stream.threads> 1,如何将任务分配给准备和分配的(在循环中)线程?
我想(我不确定)有线程池和一些循环策略的任务分配给线程,但它可以完全动态地在运行时完成,或者在开始时通过创建过滤/映射到结构。
特别是当一个主题正在执行计算密集型任务而其他时间没有的情况下,我感到非常有趣。是否有可能应用程序会因为所有线程将分配给耗时的处理器而饿死。
让我们玩了一下与场景:num.stream.threads=2
,no. partitions=4
每个主题,no. topics=2
(huge_topic和slim_topic) 在我的问题的循环一次在应用程序启动时完成的。如果在循环中我定义了2个主题,并且我从一个主题知道重量加权(huge_topic)的消息,而另一个主题则来自轻量级消息(slim_topic)。 是否有可能num.stream.threads的两个线程只会忙于来自huge_topic的任务?来自slimm_topic的消息将不得不等待处理?
感谢您的回答。这是迭代。如果我有例如* num.stream.threads *参数,那么是什么每个主题有10个分区? –
您可以在一个应用程序中拥有多个处理线程,并且您可以拥有多个应用程序实例。最后你有权决定,而不是卡夫卡。在Kafka docs的线程建模部分中的更多细节(Confluent one):https://docs.confluent.io/current/streams/architecture.html?highlight=num%20stream%20threads#threading-model – Arek
我知道你的页面曾参考:-)。我通过添加示例扩展了我的问题。 –