2016-04-21 36 views

回答

3

这可以更有效的,如果它是内置的完成,但

def partition[A](stream: DStream[A])(pred: A => Boolean) { 
    val stream1 = stream.map(x => (x, pred(x)).cache() 
    val good = stream1.filter(_._2).map(_._1) 
    val bad = stream1.filter(!_._2).map(_._1) 
    (good, bad) 
} 

cache()需要确保stream1只计算一次;如果pred足够简单,并且stream已被缓存,则只需(stream.filter(pred), stream.filter(x => !pred(x)))应该更快。

+0

但是DStream最初包含过滤和未过滤的元素。我想在同一时间返回过滤和未过滤! –

+0

所以你不是指“未过滤”,你的意思是“用相反的谓词过滤”?即一个类似于Scala集合上的'partition'的函数)? –

+0

哦,我想是这样!它是“用相反的谓词过滤” –