2016-08-04 150 views
1

对于下面的代码,stream1和stream2都单独运行,我可以看到输出,但是加入的流根本不会记录任何内容。我有一种感觉,它与加入窗口有关,但两个数据流的数据几乎完全同时出现。无法获得加入的Kafka流来运行或输出任何信息

val stream = builder.stream(stringSerde, byteArraySerde, "topic") 

val stream1 = stream 
    .filter((key, value) => somefilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic1") 

val stream2 = stream 
    .filter((key, value) => someotherfilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic2") 

val joinedStream = stream1 
    .join(stream2, (value1: Array[Byte], value2: Array[Byte]) => { 
    println("wont print anything") 
    return somerandomdata 
    }, 
    JoinWindows.of("othertopic").within(10000L), 
    stringSerde, byteArraySerde, byteArraySerde) 
+1

联接窗口即包含在元数据每个记录都附加到键和值)。如果您打印这些时间戳进行调试,这将有所帮助。要访问它们,您需要使用process() - 给定的'context'对象,包含当前处理的记录的时间戳(即,每个处理后的记录更新上下文)。 –

回答

相关问题