2016-08-03 102 views
0

我目前正在探索Spark。我面临以下任务 - 获取RDD,根据特定条件对其进行分区,然后将多个文件写入S3存储桶中的不同文件夹中。Spark RDD foreach Partition to S3

一切都很好,直到我们来上传到S3部分。我已阅读所有关于这个问题的问题,并发现我可以使用AmazonS3ClientsaveToTextFile方法进行RDD。有两个问题,我面对:

  1. 如果我与AmazonS3Client去我得到一个java.io.NotSerializableException因为代码是从星火驱动程序发送给它需要序列化和显然的AmazonS3Client不支持工人那。

  2. 如果我去saveToTextFile我也遇到类似的问题。当我进入foreachPartition循环时,我需要获得Iterable[T](在这种情况下为p),所以如果我想使用saveToTextFile,则需要创建Iterable的RDD,因此需要创建parallelize。问题是SparkContext sc也(没错)不会序列化。

rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }

任何帮助将不胜感激。

回答

2

有没有必要这样做。你可以只用saveAsTextFile与RDD:

rdd.saveAsTextFile(s"s3n://dir/to/aux/file") 

saveAsTextFile将写入S3与文件的许多部分的文件夹中(如许多地区的分区)。然后你可以合并成一个文件,如果你想:

def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = { 
    val hadoopConfig = sc.hadoopConfiguration 
    val fs = FileSystem.get(new URI(srcPath), hadoopConfig) 
    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null) 
    } 

    mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)