2015-10-07 123 views
2

我是Spark的新手,正在尝试读取csv文件并获取文件中的第一列和第二列。尽管如此,csv文件是巨大的,我不想解析csv文件中的每一行。另外,运行collect()函数可能会导致进程崩溃,因为内存可能不足以支持返回的数据量。所以我想知道是否可以用csv数据的一个子集创建一个RDD。例如,是否可以生成包含csv文件的第10至1000行的RDD并忽略其他行。如何获取csv文件的子集作为Spark RDD

现在,我只有

csvdata = sc.textFile("hdfs://nn:port/datasets/sample.csv").map(lambda line: line.split(",")) 

这基本上整个CSV文件创建一个RDD。是否可以从csvdata创建一个RDD,其中只包含10到1000行?

非常感谢您提供的帮助。

+0

http://stackoverflow.com/questions/15644859/how-to-read-specific-part-of-large-file-in-python – Ashalynd

回答

2

您可以通过索引加载所有和过滤器:

rdd = sc.parallelize(range(0, -10000, -1)) 
rdd.zipWithIndex().filter(lambda kv: 9 <= kv[1] < 999).keys() 

调整范围取决于你如何定义10日线。

+0

This Works。我需要在textFile()的回答中替换parallelize()函数,并且它工作正常。非常感谢。 – thisisshantzz

0

RDD不是存储在内存中的数据,而是有意对某些数据进行处理。当您调用终端操作时,如“收集”或“减少”,则Spark将处理数据。 Spark根据您在RDD上的操作历史记录,进行了一些巧妙的优化,从而限制了它必须完成的工作量。

(通过调用一个RDD一些操作,但是不能呼叫终端操作自己尝试一下。没有任何反应!)

所以,你可以做如(这是斯卡拉但是在Python不是太不相似)

val first10results: Array[Array[String]] = sc.textFile(filePath) 
     .map(f => f.split(",")) 
     .take(10) 

星火就知道了,因为take(10),您只需要前10行。所以它只会从文件中取10行!简单。