2015-11-10 84 views
7

我想在包含avro文件的一些生成的S3路径上运行spark工作(spark v1.5.1)。我正在加载它们:如何让火花忽略缺少的输入文件?

val avros = paths.map(p => sqlContext.read.avro(p)) 

虽然有些路径不存在。我如何能够忽略那些空洞的路径?以前我使用过this answer,但我不确定如何在新的数据帧API中使用它。

注意:我理想的是寻找类似的方法来链接答案,只是使输入路径可选。我不是特别想要在S3中明确检查路径的存在(因为这很麻烦并且可能会使开发变得笨拙),但是如果现在没有干净的方法来实现这一点,我想这就是我的后备。

回答

9

我会使用scala Try类型来处理在读取avro文件目录时出现故障的可能性。使用'尝试'我们可以使我们的代码中显式失败的可能性,并以功能方式处理:

object Main extends App { 

    import scala.util.{Success, Try} 
    import org.apache.spark.{SparkConf, SparkContext} 
    import com.databricks.spark.avro._ 

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    //the first path exists, the second one doesn't 
    val paths = List("/data/1", "/data/2") 

    //Wrap the attempt to read the paths in a Try, then use collect to filter 
    //and map with a single partial function. 
    val avros = 
    paths 
     .map(p => Try(sqlContext.read.avro(p))) 
     .collect{ 
     case Success(df) => df 
     } 
    //Do whatever you want with your list of dataframes 
    avros.foreach{ df => 
    println(df.collect()) 
    } 
    sc.stop() 
} 
+0

从spark文档:'collect()'可能导致驱动程序内存不足,尽管如此,因为collect()会将整个RDD提取到一台机器上。有没有不使用'collect()'的解决方案?这是一个非常大的数据集。 – jbrown

+2

当在RDD上调用collect()时,情况就是如此。在我第一次调用'collect(...)'的地方,包含一个部分函数,​​它在一个RDD列表上,它是List上的collect函数,而不是任何RDD。这相当于做一个'map'和'filter'。我最后在'foreach'的末尾再次使用'collect()',但这只是操作RDD列表的一个例子,我不认为这是你在自己的应用程序中会做的事情,但是我会需要一个简单的结尾来看看这种方法是否正常工作。 – mattinbits

+0

哦,好的。我会试一试,看看它是否有效。我认为第一个“collect”正在评估RDD并将所有数据发送到驱动程序节点。 – jbrown