我有一个Spark SQL将我的S3 JSON文件读入DataFrame中。Spark SQL read.json读取JSON输入两次
然后我在该DataFrame上运行2个SQL,并在执行每个SQL之前发现SparkSQL读取我的S3 JSON文件两次。
如果数据框对象不被重用,这将是非常昂贵的...
任何帮助表示赞赏。
这里是我的代码片段:
protected boolean doAggregations() throws IOException {
SQLContext sqlContext = getSQLContext();
DataFrame edgeDataFrame = sqlContext.read().json(sourceDataDirectory);
edgeDataFrame.cache();
getLogger().info("Registering and caching the table 'edgeData'");
edgeDataFrame.registerTempTable("edgeData");
String dateKey = DateTimeUtility.SECOND_FORMATTER.print(System.currentTimeMillis());
for (Map.Entry<String, AggregationMetadata> entry : aggMetadataMap.entrySet()) {
String aggName = entry.getKey();
String resultDir = getAggregationResultDirectory(aggName, dateKey);
String sql = entry.getValue().getSql();
// The input file(s) are being read again and again instead of operating on the "edgeDataFrame"
DataFrame dataFrame = sqlContext.sql(sql);
dataFrame.write().format("json").save(resultDir);
}
return true;
}