2016-06-20 43 views
1

我有2个输入,其中第一个输入是流(比如input1),第二个输入是批处理(比如input2)。 我想弄清楚第一个输入中的键是否与第二个输入中的单个行或多个行匹配。 的进一步转化/逻辑取决于匹配的行数,是否单列匹配或多个行匹配(在第一输入ATLEAST一个键)如何确定DStream是否为空

if(single row matches){ 
    // do something 
}else{ 
    // do something 
} 

代码,我试图到目前为止

val input1Pair = streamData.map(x => (x._1, x)) 
val input2Pair = input2.map(x => (x._1, x)) 
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)} 
val result = joinData.mapValues{ 
    case(v, Some(a)) => 1L 
    case(v, None) => 0 
}.reduceByKey(_ + _).filter(_._2 > 1) 

我已经完成了上面的编码。 当我做result.print时,如果所有的键只与input2中的一行相匹配,它将不会打印任何内容。 由于DStream可能有多个RDD,所以不知道如何确定DStream是否为空。如果这是可能的,那么我可以做一个检查。

回答

3

由于DStream代表随时间推移的集合,因此无法确定DStream是否为空。从概念的角度来看,一个空的DStream将是一个永远不会有数据的流,并不会很有用。

什么可以做是为了检查一个给定的微量分批有数据还是没有:

dstream.foreachRDD{ rdd => if (rdd.isEmpty) {...} } 

请注意,在任何给定时间点,只有一个RDD。

我认为实际的问题是如何检查参考RDD和DStream中的数据之间的匹配数量。也许最简单的方法是将交叉两个集合,并检查交集大小:

val intersectionDStream = streamData.transform{rdd => rdd.intersection(input2)} 
intersectionDStream.foreachRDD{rdd => 
    if (rdd.count > 1) { 
     ..do stuff with the matches 
    } else { 
     ..do otherwise 
    } 
} 

我们也可以放置foreachRDD操作中的RDD为中心的转变:

streamData.foreachRDD{rdd => 
    val matches = rdd.intersection(input2) 
    if (matches.count > 1) { 
     ..do stuff with the matches 
    } else { 
     ..do otherwise 
    } 
} 
+0

非常感谢您的回复。在我的情况下,input1 RDD类型不同于input2 RDD类型。最后使用cogroups实现。 – Dazzler