我正在使用wholeTextFiles
来读取目录中的每个文件。之后,我使用map
在rdd的每个元素上调用一个函数。整个程序仅使用每个文件的50行。代码如下:apache spark:从目录中读取大尺寸文件
def processFiles(fileNameContentsPair):
fileName= fileNameContentsPair[0]
result = "\n\n"+fileName
resultEr = "\n\n"+fileName
input = StringIO.StringIO(fileNameContentsPair[1])
reader = csv.reader(input,strict=True)
try:
i=0
for row in reader:
if i==50:
break
// do some processing and get result string
i=i+1
except csv.Error as e:
resultEr = resultEr +"error occured\n\n"
return resultEr
return result
if __name__ == "__main__":
inputFile = sys.argv[1]
outputFile = sys.argv[2]
sc = SparkContext(appName = "SomeApp")
resultRDD = sc.wholeTextFiles(inputFile).map(processFiles)
resultRDD.saveAsTextFile(outputFile)
目录中的每个文件的大小可以在我的情况非常大,因为这个原因使用wholeTextFiles
API的将是在这种情况下,效率不高。有没有有效的方法来做到这一点?我可以考虑逐个遍历目录中的每个文件,但这似乎效率不高。我是新来的火花。请让我知道是否有任何有效的方法来做到这一点。
每个文件的大小有多大?你不能把文件分割成更小的文件吗? –
@DatTran每个文件的大小可以是几Gbs,并且目录中的文件数量可以大于100.我认为可以将文件拆分的一种方法是逐个拆分每个文件,并从每个文件中取出第一个拆分文件并保留这些文件在临时目录中。之后,我们可以在该临时目录上应用'wholeTextFiles'。这是你建议分割文件的方式吗?如果没有,请让我知道你会建议如何拆分文件? – mcurious