2017-05-08 53 views
0

我正在使用wholeTextFiles来读取目录中的每个文件。之后,我使用map在rdd的每个元素上调用一个函数。整个程序仅使用每个文件的50行。代码如下:apache spark:从目录中读取大尺寸文件

def processFiles(fileNameContentsPair): 
    fileName= fileNameContentsPair[0] 
    result = "\n\n"+fileName 
    resultEr = "\n\n"+fileName 
    input = StringIO.StringIO(fileNameContentsPair[1]) 
    reader = csv.reader(input,strict=True) 

    try: 
     i=0 
     for row in reader: 
     if i==50: 
      break 
     // do some processing and get result string 
     i=i+1 
    except csv.Error as e: 
    resultEr = resultEr +"error occured\n\n" 
    return resultEr 
    return result 



if __name__ == "__main__": 
    inputFile = sys.argv[1] 
    outputFile = sys.argv[2] 
    sc = SparkContext(appName = "SomeApp") 
    resultRDD = sc.wholeTextFiles(inputFile).map(processFiles) 
    resultRDD.saveAsTextFile(outputFile) 

目录中的每个文件的大小可以在我的情况非常大,因为这个原因使用wholeTextFiles API的将是在这种情况下,效率不高。有没有有效的方法来做到这一点?我可以考虑逐个遍历目录中的每个文件,但这似乎效率不高。我是新来的火花。请让我知道是否有任何有效的方法来做到这一点。

+1

每个文件的大小有多大?你不能把文件分割成更小的文件吗? –

+0

@DatTran每个文件的大小可以是几Gbs,并且目录中的文件数量可以大于100.我认为可以将文件拆分的一种方法是逐个拆分每个文件,并从每个文件中取出第一个拆分文件并保留这些文件在临时目录中。之后,我们可以在该临时目录上应用'wholeTextFiles'。这是你建议分割文件的方式吗?如果没有,请让我知道你会建议如何拆分文件? – mcurious

回答

1

好吧,我建议将您的文件先拆分成更小的块,几个千兆字节太大而无法读取,这是您延迟的主要原因。如果你的数据在HDFS上,你可以为每个文件提供64MB的内容。否则,您应该尝试使用文件大小,因为它取决于您拥有的执行程序的数量。所以如果你有更多的小块,你可以增加这个来获得更多的并行性。同样,你也可以增加你的分区来调整它,因为你的processFiles函数似乎不是CPU密集型的。许多执行程序唯一的问题是I/O增加,但是如果文件很小,应该不是什么大问题。

顺便说一下,不需要临时目录,wholeTextFiles支持像*这样的通配符。另外请注意,如果您将S3用作文件系统,那么如果您的小文件过多,则可能会出现瓶颈,因为读取可能需要一段时间而不是大文件。所以这不是微不足道的。

希望这会有所帮助!