2
在函数中,有没有办法在使用filter
后返回两个DStream? 例如,当我过滤DStream
时,已过滤的将存储在DStream
中,未过滤的将存储在另一个DStream
中。在火花流中使用过滤器转换后,如何在函数中返回两个DStream?
在函数中,有没有办法在使用filter
后返回两个DStream? 例如,当我过滤DStream
时,已过滤的将存储在DStream
中,未过滤的将存储在另一个DStream
中。在火花流中使用过滤器转换后,如何在函数中返回两个DStream?
这可以更有效的,如果它是内置的完成,但
def partition[A](stream: DStream[A])(pred: A => Boolean) {
val stream1 = stream.map(x => (x, pred(x)).cache()
val good = stream1.filter(_._2).map(_._1)
val bad = stream1.filter(!_._2).map(_._1)
(good, bad)
}
注cache()
需要确保stream1
只计算一次;如果pred
足够简单,并且stream
已被缓存,则只需(stream.filter(pred), stream.filter(x => !pred(x)))
应该更快。
但是DStream最初包含过滤和未过滤的元素。我想在同一时间返回过滤和未过滤! –
所以你不是指“未过滤”,你的意思是“用相反的谓词过滤”?即一个类似于Scala集合上的'partition'的函数)? –
哦,我想是这样!它是“用相反的谓词过滤” –