2016-02-18 27 views
0

我正在使用运行在云中的Spark。我的存储不是传统的HDFS,但我通过URL连接到我的文件。例如,我可以执行以下操作,Spark将接收目录中的所有文件。我也可以使用任何其他HDFS函数调用。Spark:按不同顺序处理文件然后返回

sc.TextFile("hdfs://mystorage001/folder/month1/*")

我的数据分布5个不同的驱动器之间,我想火花循环赛每个驱动器读取之间,这样我就可以从所有5并行读取。我目前可以执行以下操作并处理所有数据,但不会并行读取驱动器。相反,spark会读取一个驱动器中的所有文件,然后移至下一个。

sc.TextFile("hdfs://mystorage001/folder/month1/*, hdfs://mystorage002/folder/month2/*, hdfs://mystorage003/folder/month3/*, hdfs://mystorage004/folder/month4/*,hdfs://mystorage005/folder/month5/*")

我有100个执行者。所以我也尝试过这个,但是这会给我带来最差的表现。

sc.TextFile("hdfs://mystorage001/folder/month1/*, 20) union sc.TextFile("hdfs://mystorage002/folder/month2/*, 20) union sc.TextFile("hdfs://mystorage003/folder/month3/*, 20) union sc.TextFile("hdfs://mystorage004/folder/month4/*, 20) union sc.TextFile("hdfs://mystorage005/folder/month5/*")

我知道,在每个目录已经我命名的文件000000_0,000001_0,000002_0,等等......所以,如果我可以命令的文件列表火花由名字的那部分内容,我觉得这将完成我想要的,但唯一的办法是我已经想出了如何返回列表的方法是通过wholeTextFile(),无论如何都需要首先加载所有数据。

+0

http://stackoverflow.com/a/32978688/1560062,http://stackoverflow.com/q/11342400/1560062 – zero323

回答

0
import org.apache.hadoop.fs._ 
val fs = FileSystem.get(sc.hadoopConfiguration) 
val path = Array(new Path("hdfs://mystorage001/folder/month1/"), 
new Path("hdfs://mystorage001/folder/month2/"), 
new Path("hdfs://mystorage001/folder/month3/"), 
new Path("hdfs://mystorage001/folder/month4/"), 
new Path("hdfs://mystorage001/folder/month5/")) 
path.map(fs.listStatus(_)).flatMap(_.zipWithIndex).sortBy(_._2).map(_._1.getPath).mkString(",")