2015-04-06 65 views
2

是否可以取消未来的火花并且仍然可以通过处理后的元素获得更小的RDD?Spark异步接口的部分结果?

星火异步操作“记录”在这里

http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.rdd.AsyncRDDActions

而未来本身具有丰富的功能集

http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.FutureAction

使用情况下,我想的就是有一个非常大的地图,可以在计算30分钟后中止,并且仍然收集 - 甚至是迭代或saveAsObjectFile - 已有效映射的RDD的子集。

回答

2

FutureAction.cancel会导致失败(请参阅comment in JobWaiter.scala),因此您无法使用它来获取部分结果。我认为没有办法通过异步API来完成。

相反,您可能会在30分钟后停止处理输入。

val stopTime = System.currentTimeMillis + 30 * 60 * 1000 // 30 minutes from now. 
rdd.mapPartitions { partition => 
    if (System.currentTimeMillis < stopTime) partition.map { 
    // Process it like usual. 
    ??? 
    } else { 
    // Time's up. Don't process anything. 
    Iterator() 
    } 
} 

请记住,一旦所有的洗牌依存关系完成,这只会产生变化。 (即使30分钟过去,也不能停止洗牌。)

+0

这完全没有经过测试。让我知道它是否有效! –

+1

:-)我想所有的异步函数也是未经测试的,至少在生产站点。 – arivero