2017-10-09 81 views
1

我的代码做更多的少这样的设置:卡夫卡流中线程分配的策略是什么?

// loop over the inTopicName(s) { 

KStream<String, String> stringInput = kBuilder.stream(STRING_SERDE, STRING_SERDE, inTopicName); 
stringInput.filter(streamFilter::passOrFilterMessages).map(processor_i).to(outTopicName); 

// } end of loop 

streams = new KafkaStreams(kBuilder, streamsConfig); 
streams.cleanUp(); 
streams.start(); 

如果有例如num.stream.threads> 1,如何将任务分配给准备和分配的(在循环中)线程?

我想(我不确定)有线程池和一些循环策略的任务分配给线程,但它可以完全动态地在运行时完成,或者在开始时通过创建过滤/映射到结构。

特别是当一个主题正在执行计算密集型任务而其他时间没有的情况下,我感到非常有趣。是否有可能应用程序会因为所有线程将分配给耗时的处理器而饿死。

让我们玩了一下与场景:num.stream.threads=2no. partitions=4每个主题,no. topics=2(huge_topic和slim_topic) 在我的问题的循环一次在应用程序启动时完成的。如果在循环中我定义了2个主题,并且我从一个主题知道重量加权(huge_topic)的消息,而另一个主题则来自轻量级消息(slim_topic)。 是否有可能num.stream.threads的两个线程只会忙于来自huge_topic的任务?来自slimm_topic的消息将不得不等待处理?

回答

2

如果有例如, num.stream.threads> 1,如何将任务分配给 准备并分配(在循环中)的线程?

任务分配给使用分区石斑鱼的线程。你可以阅读关于它here。 AFAIK在重新平衡之后被调用,所以它不是一个非常动态的过程。这就是说,我认为没有饥饿的选择。

+0

感谢您的回答。这是迭代。如果我有例如* num.stream.threads *参数,那么是什么每个主题有10个分区? –

+0

您可以在一个应用程序中拥有多个处理线程,并且您可以拥有多个应用程序实例。最后你有权决定,而不是卡夫卡。在Kafka docs的线程建模部分中的更多细节(Confluent one):https://docs.confluent.io/current/streams/architecture.html?highlight=num%20stream%20threads#threading-model – Arek

+0

我知道你的页面曾参考:-)。我通过添加示例扩展了我的问题。 –

1

在内部,Kafka Streams基于分区创建任务。用你的循环例子,假设你有3个输入题目A,B,C分别有2,4和3个分区。对于这一点,你会得到4任务(即,在所有主题分区的最大数量)与下列分区任务分配:

  • T0:A-0,B-0,C-0
  • T1 :A-1,B-1,C-1
  • T2:              B-2,C-2
  • T3:              B-3

分区按“编号”分组并分配给相应的任务。这是在运行时确定的(即,在您致电KafakStreams#start()之后),因为在此之前,每个主题的分区数量未知。

如果您不了解卡夫卡流的所有内部细节,那么不建议混乱分组分区 - 您可以非常轻松地分解东西!

关于线程:任务限制了线程的数量。对于我们的示例,这意味着您可以拥有最多4个线程(如果您拥有更多线程,那些线程将处于空闲状态,因为没有任何任务留给线程分配)。你如何“分配”这些线程取决于你。您可以使用4个线程(或之间的任何内容)为单个应用程序实例提供4个单线程应用程序实例。

如果您的任务比线程少,则会根据任务数量(假定所有任务具有相同的负载)以负载均衡方式分配任务。