2017-01-09 54 views
0

我是Spark新手,在本地机器上执行一些(〜)小任务。 我的任务如下:我在S3中存储了365个压缩的csv文件,其中包含每日日志。我想要构建一整年的数据框架。我的方法是从存储桶中检索密钥,建立日常数据框,将它们统一到月份数据框中,为它们做同样的事情,并获得全年数据框作为回报。在火花数据框上应用动作时出错

这对我为检测而检索的一些样本数据起了作用。在我构建DataFrame之前,我解压缩文件,将未压缩的csv文件写入磁盘,并用它创建DataFrame。

问题:如果我从磁盘(使临时)删除csv文件,创建数据框后,我无法对数据帧执行任何操作(例如year_df.count())。抛出Spark.exception:

“作业已中止因故障阶段:.... java.io.FileNotFoundException: 的.csv文件不存在”

一些搜索后在SO,我发现原因可能是Spark在使用DataFrames(External Table not getting updated from parquet files written by spark streaming)应用SQL查询时使用的MetaData。我改变了

spark.sql.parquet.cacheMetadata

运行spark = SparkSession.builder.config("spark.sql.parquet.cacheMetadata", "false").getOrCreate()。确保spark.conf.get("spark.sql.parquet.cacheMetadata")返回false ..

找不到任何解决方案。当然,将所有文件解压缩到S3中都可以使用,但这对我没有用处。

谢谢!

回答

0

Spark以懒惰的方式执行动作。

这意味着,如果您可以进行少量转换,但只有在调用操作时才会读取文件。

它以同样的方式工作,对数据集在RDDS,因为数据集是由RDDS

充分考虑代码的支持:

val df = sqlContext.read // read file 
val query = df.groupBy('x).count() 

query.show() // here the data will be read 

所以,如果你删除文件时,它会读取之前,然后星火意志抛出异常。您可以通过执行一些操作强制阅读,即takeshow。这将被缓存,如果你会做cache()

val df = sqlContext.read // read file 
val query = df.groupBy('x).count().cache() 

query.show() // here the data will be read and cached 
+0

嗨,谢谢你的回复。所以,如果我理解正确,我可以安全地删除源文件(我的。csv在这种情况下)只有在我缓存我感兴趣的查询后,并调用一个操作来计算它们。所以,我有两个选择:将未压缩的CSV文件保存到我的S3以便进行持续访问,或者直接读取gzip文件进行触发,并在一些转换后创建DataFrame。正确? – Mike

+0

@Mike Flow:读 - >缓存 - >一些动作 - >删除文件应该没问题 - 除非你发现一些不寻常的Spark行为;)你是正确的,我会保留文件在S3中,直到处理完成 - 万一出现故障,你将有机会重新处理文件 –

0

缓存()仍然只是一个提示; Spark可能需要在出现故障时重新计算值,或者仅缓存的数据由于缓存压力而被丢弃。如果你想删除源数据,那么确保你已经写出了你的结果,并且真的再也不需要数据了。

我实际上建议将CSV转换为任何列式格式(ORC,Parquet)&用Snappy压缩。处理效率更高,尤其是谓词下推