2015-07-19 144 views
1

我有一个Spark SQL将我的S3 JSON文件读入DataFrame中。Spark SQL read.json读取JSON输入两次

然后我在该DataFrame上运行2个SQL,并在执行每个SQL之前发现SparkSQL读取我的S3 JSON文件两次。

如果数据框对象不被重用,这将是非常昂贵的...

任何帮助表示赞赏。

这里是我的代码片段:

protected boolean doAggregations() throws IOException { 

    SQLContext sqlContext = getSQLContext(); 

    DataFrame edgeDataFrame = sqlContext.read().json(sourceDataDirectory); 


    edgeDataFrame.cache(); 

    getLogger().info("Registering and caching the table 'edgeData'"); 
    edgeDataFrame.registerTempTable("edgeData"); 

    String dateKey = DateTimeUtility.SECOND_FORMATTER.print(System.currentTimeMillis()); 

    for (Map.Entry<String, AggregationMetadata> entry : aggMetadataMap.entrySet()) { 
     String aggName = entry.getKey(); 
     String resultDir = getAggregationResultDirectory(aggName, dateKey); 
     String sql = entry.getValue().getSql(); 
     // The input file(s) are being read again and again instead of operating on the "edgeDataFrame" 
     DataFrame dataFrame = sqlContext.sql(sql); 
     dataFrame.write().format("json").save(resultDir); 
    } 
    return true; 
} 

回答

0

您的JSON文件被读出两次,因为星火不知道JSON的架构和SQL需要一个已知的模式。因此,Spark采用了两步法:

  1. 发现所有JSON记录的模式为每个JSON记录模式的联合。

  2. 将数据加载到适当配置的数据结构中。

想象一下,你有简单的一行JSON文件:

{"category" : "A", "num" : 5} 

如果您在spark-shell执行

sqlContext.read.json(path).saveAsTable("test")

你会发现两遍。

第一遍有一个映射阶段,它收集每个分区发现的模式,reduce阶段将模式组合为所有分区的联合模式。

对于map阶段,你会看到类似这样的:

​​

对于减少阶段,你会看到类似这样的:

INFO SparkContext: Starting job: reduce at JsonRDD.scala:54 

之后,当模式是已知的, JSON数据的实际加载将开始。这只会涉及映射阶段,因为一旦发现模式,就不需要在分区处理器之间共享信息。

你可以看到星火如何对待在日志中的数据列:

INFO ColumnChunkPageWriteStore: written 56B for [category] BINARY: 1 values, 11B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED] 
INFO ColumnChunkPageWriteStore: written 70B for [num] INT64: 1 values, 14B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]