2017-04-07 91 views
1

我在for循环中使用满足不同条件的不同查询超过1500次的查询缓存的配置单元临时表。我需要在循环内使用unionAll合并它们。但是,由于spark无法跟上RDD血统,我得到了stackoverflow错误。将大量火花数据帧合并为一个

伪代码:

df=[from a hive table] 
tableA=[from a hive table] 
tableA.registerTempTable("tableA") 
HiveContext.sql('CACHE TABLE tableA') 

for i in range(0,2000): 
    if (list[0]['column1']=='xyz'): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    else: 
     df1=query something from tableA 
     df=df.unionAll(df1) 

这将引发错误计算器由于RDD血统变硬。所以我尝试如下检查点:

for i in range(0,2000): 
    if (list[0]['column1']=='xyz'): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    else: 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    df.rdd.checkpoint 
    df = sqlContext.createDataFrame(df.rdd, df.schema) 

我得到了同样的错误。所以我尝试了SaveAsTable,我一直想避免因为每个hql查询和配置单元io在循环内的作业提交滞后。但是这种方法运作良好。

for i in range(0,2000): 
    if (list[0]['column1']=='xyz'): 
     df=query something from tableA 
     df.write.saveAsTable('output', mode='append') 
    elif(): 
     df=query something from tableA 
     df.write.saveAsTable('output', mode='append') 

我需要帮助避免将数据帧保存到循环内的配置单元中。我想以某种内存和高效的方式合并dfs。我尝试过的其他选项之一是将查询结果直接插入临时表中,但出现错误:无法插入基于RDD的表中。

+1

一般来说,这种循环和联合操作总是会导致Spark出现问题。你正在运行什么类型的查询?也许有更巧妙的方式来重构你的代码,这不需要循环。另外,有什么条件? –

+0

条件不复杂 - 一些正则表达式匹配和一些直接整数匹配。但问题是,我已经让最终用户创建了这些条件,他们只能编写基于sql的条件并将它们导入spark以处理数据。天真地说,我的应用程序像SQL工作台一样工作 - 不同之处在于它运行所有查询并将结果存储在单个表中。 – Mike

回答

0

也许,结果的临时表将起作用。

df1="query something from tableA".registerTempTable("result") 
sqlContext.sql("Insert into result query something from tableA") 
+0

就像我在帖子中提到的那样,它会抛出一个错误:无法插入基于RDD的表。 – Mike