2016-11-22 77 views
2

在我的Spark应用程序中,我想对循环中的数据帧执行操作并将结果写入hdfs。递归数据帧操作

伪代码:

var df = emptyDataframe 
for n = 1 to 200000{ 
    someDf=read(n) 
    df = df.mergeWith(somedf) 
} 
df.writetohdfs 

在上面的例子中,我取得好成绩时, “mergeWith” 做了unionAll。但是,当在“mergeWith”中做一个(简单)加入时,作业变得非常慢(每个核心有2个执行器,每个核心大于1h)并且永远不会结束(作业会自行中止)。

在我的场景中,我用大约含有〜1mb的文本数据的文件进行了大约50次迭代。

因为合并顺序对我来说很重要,我怀疑这是由于DAG的产生,导致整个事情在我存储数据的那一刻运行。

现在我正试图在合并的数据框上使用一个.persist,但是这似乎也相当缓慢。

编辑:

由于作业运行,我注意到(尽管我做了计数和.persist)在内存中的数据帧看起来并不像一个静态的数据帧。它看起来像是它一直在做的所有合并的一条通路,有效地减缓了工作的线性。

我有权利承担var df是这个的罪魁祸首吗?

spiraling out of controle

击穿的问题,因为我看到它:

dfA = empty 
dfC = dfA.increment(dfB) 
dfD = dfC.increment(dfN).... 

当我希望DF” AC和d为对象,不同的火花的东西,如果我坚持不关心或是否重新分配。 星火它看起来像这样:

dfA = empty 
dfC = dfA incremented with df B 
dfD = ((dfA incremented with df B) incremented with dfN).... 

UPDATE2

为了摆脱持续的DF的我能“破发”的血统转换DF并RDD,然后再返回时,无法正常工作。 这有一点额外的开销,但是可以接受的(工作在几分钟内完成,而不是几个小时/从不) 我将对持久性进行一些更多的测试,并以解决方法的形式制定答案。

结果: 这似乎只能解决这些问题的表面。在现实中我又回到了起点,并得到OOM异常java.lang.OutOfMemoryError: GC overhead limit exceeded

+0

我不清楚这个'mergeWith'函数应该做什么(你同时编写union和join)。你能否包含'mergeWith'的代码? –

+0

'mergeWith'函数可以是很多事情,当它只是一个'union all'时,我会得到很好的结果。或者它可能是这样的: 'SELECT f。* FROM full f LEFT OUTER JOIN delta delta ON CONCAT(fa,fb)= CONCAT(ia,iz)WHERE CONCAT(ia,iz)is NULL UNION ALL SELECT d。* FROM delta d' – Havnar

+0

做一个'union'和做'join'有很大的区别。对于'union',Spark只需要写入附加的数据,而对于'join'则必须将数据混洗。根据你的数据的大小,当然做一个'join'可能很容易出现OOM异常 - 特别是因为你正在运行在一个很小的群集上。 –

回答

0

因此,以下是我最终使用的。它对我的用例来说足够高性能,它工作并且不需要持久化。

非常多的是解决方法而不是解决方法。

val mutableBufferArray = ArrayBuffer[DataFrame]() 
mutableBufferArray.append(hiveContext.emptyDataframe()) 

for loop { 

       val interm = mergeDataFrame(df, mutableBufferArray.last) 
       val intermSchema = interm.schema 
       val intermRDD = interm.rdd.repartition(8) 


       mutableBufferArray.append(hiveContext.createDataFrame(intermRDD, intermSchema)) 
       mutableBufferArray.remove(0) 

} 

我这是怎么搏斗钨为合规。 通过从DF进入RDD并返回,我最终得到了一个真实的物体,而不是从前到后整个钨生成的过程管道。

在我的代码中,在写入磁盘之前迭代几次(50-150次迭代看起来效果最好)。这就是我再次清除bufferArray以重新开始的地方。

0

如果你有这样的代码:

var df = sc.parallelize(Seq(1)).toDF() 

for(i<- 1 to 200000) { 
    val df_add = sc.parallelize(Seq(i)).toDF() 
    df = df.unionAll(df_add) 
} 

然后DF将有40万个分区之后,这使得以下行动效率低下(因为你有每个分区1个任务)。

尝试将分区数减少到例如200(使用例如df.coalesce(200).write.saveAsTable(....)

+0

我已经合并了输出,对不起,我没有提到。 这里的问题是当我更新数据框中的记录(所以不只是附加一个联合所有) 因此,运行时本身已经放慢了持续时间(试图不减速) 重新分区也没有帮助。 – Havnar

+0

澄清我的意思;坚持或 在迭代过程中重新分区......所有结果都是相同的结果(请参见问题图片),最后只需将问题移到需要写入HDFS的位置 – Havnar