2017-02-27 115 views
1
final KStream<String, EmpModel> empModelStream = getMapOperator(empoutStream); 
final KStream<String, EmpModel> empModelinput = getMapOperator(inputStream); 

// empModelinput.print(); 
// empModelStream.print(); 

empModelStream.join(empModelinput, new ValueJoiner<EmpModel, EmpModel, Object>() { 

    @Override 
    public Object apply(EmpModel paramV1, EmpModel paramV2) { 
     System.out.println("Model1 "+paramV1.getKey()); 
     System.out.println("Model2 "+paramV2.getKey()); 
     return paramV1; 
    } 

},JoinWindows.of("2000L")); 

两个KStreams我得到错误:卡夫卡流API:我加盟empmodel

Invalid topology building: KSTREAM-MAP-0000000003 and KSTREAM-MAP-0000000004 are not joinable

回答

2

如果你想连接两个KStreams你必须确保两个具有相同数量的分区。 (参见“注” 中​​盒)

如果使用卡夫卡v0.10.1+,重新分区将自动发生(参见http://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning)。

卡夫卡v0.10.0.x你有两种选择:

  1. 确保原始输入主题做有相同数量的分区
  2. 的,或者以.through("my-repartitioning-topic")呼叫在连接前添加到KStream S的一个。您需要创建主题"my-repartioning-topic"与正确数量的分区(即,分区数量与第二个KStream的原始输入主题相同),然后再启动您的Streams应用程序