我目前正在探索Spark。我面临以下任务 - 获取RDD,根据特定条件对其进行分区,然后将多个文件写入S3存储桶中的不同文件夹中。Spark RDD foreach Partition to S3
一切都很好,直到我们来上传到S3部分。我已阅读所有关于这个问题的问题,并发现我可以使用AmazonS3Client
或saveToTextFile
方法进行RDD。有两个问题,我面对:
如果我与
AmazonS3Client
去我得到一个java.io.NotSerializableException
因为代码是从星火驱动程序发送给它需要序列化和显然的AmazonS3Client不支持工人那。如果我去
saveToTextFile
我也遇到类似的问题。当我进入foreachPartition
循环时,我需要获得Iterable[T]
(在这种情况下为p
),所以如果我想使用saveToTextFile
,则需要创建Iterable的RDD,因此需要创建parallelize
。问题是SparkContextsc
也(没错)不会序列化。
rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }
任何帮助将不胜感激。