2015-03-18 75 views

回答

2

以下是我如何做到的。创建一个空的RDD,它是您的previousWindow。然后在forEachRDD中,计算最后一个窗口和当前窗口之间的差异。如果当前窗口包含的记录不在以前的窗口中,则该批次中有新内容。最后,将上一个窗口设置为当前窗口中的内容。

... 

    var previousWindowRdd = sc.emptyRDD[String] 

    dStream.foreachRDD { 
    windowRdd => { 
     if (!windowRdd.isEmpty) processWindow(windowRdd.cache()) 
    } 
    } 

    ... 

def processWindow(windowRdd: RDD[String]) = { 
    val newInBatch = windowRdd.subtract(previousWindowRdd) 

    if (!newInBatch.isEmpty()) 
    processNewBatch(windowRdd) 

    previousWindowRdd = windowRdd 
} 
相关问题