我在for循环中使用满足不同条件的不同查询超过1500次的查询缓存的配置单元临时表。我需要在循环内使用unionAll合并它们。但是,由于spark无法跟上RDD血统,我得到了stackoverflow错误。将大量火花数据帧合并为一个
伪代码:
df=[from a hive table]
tableA=[from a hive table]
tableA.registerTempTable("tableA")
HiveContext.sql('CACHE TABLE tableA')
for i in range(0,2000):
if (list[0]['column1']=='xyz'):
df1=query something from tableA
df=df.unionAll(df1)
elif():
df1=query something from tableA
df=df.unionAll(df1)
elif():
df1=query something from tableA
df=df.unionAll(df1)
elif():
df1=query something from tableA
df=df.unionAll(df1)
else:
df1=query something from tableA
df=df.unionAll(df1)
这将引发错误计算器由于RDD血统变硬。所以我尝试如下检查点:
for i in range(0,2000):
if (list[0]['column1']=='xyz'):
df1=query something from tableA
df=df.unionAll(df1)
elif():
df1=query something from tableA
df=df.unionAll(df1)
else:
df1=query something from tableA
df=df.unionAll(df1)
df.rdd.checkpoint
df = sqlContext.createDataFrame(df.rdd, df.schema)
我得到了同样的错误。所以我尝试了SaveAsTable,我一直想避免因为每个hql查询和配置单元io在循环内的作业提交滞后。但是这种方法运作良好。
for i in range(0,2000):
if (list[0]['column1']=='xyz'):
df=query something from tableA
df.write.saveAsTable('output', mode='append')
elif():
df=query something from tableA
df.write.saveAsTable('output', mode='append')
我需要帮助避免将数据帧保存到循环内的配置单元中。我想以某种内存和高效的方式合并dfs。我尝试过的其他选项之一是将查询结果直接插入临时表中,但出现错误:无法插入基于RDD的表中。
一般来说,这种循环和联合操作总是会导致Spark出现问题。你正在运行什么类型的查询?也许有更巧妙的方式来重构你的代码,这不需要循环。另外,有什么条件? –
条件不复杂 - 一些正则表达式匹配和一些直接整数匹配。但问题是,我已经让最终用户创建了这些条件,他们只能编写基于sql的条件并将它们导入spark以处理数据。天真地说,我的应用程序像SQL工作台一样工作 - 不同之处在于它运行所有查询并将结果存储在单个表中。 – Mike