在我的Spark应用程序中,我想对循环中的数据帧执行操作并将结果写入hdfs。递归数据帧操作
伪代码:
var df = emptyDataframe
for n = 1 to 200000{
someDf=read(n)
df = df.mergeWith(somedf)
}
df.writetohdfs
在上面的例子中,我取得好成绩时, “mergeWith” 做了unionAll。但是,当在“mergeWith”中做一个(简单)加入时,作业变得非常慢(每个核心有2个执行器,每个核心大于1h)并且永远不会结束(作业会自行中止)。
在我的场景中,我用大约含有〜1mb的文本数据的文件进行了大约50次迭代。
因为合并顺序对我来说很重要,我怀疑这是由于DAG的产生,导致整个事情在我存储数据的那一刻运行。
现在我正试图在合并的数据框上使用一个.persist,但是这似乎也相当缓慢。
编辑:
由于作业运行,我注意到(尽管我做了计数和.persist)在内存中的数据帧看起来并不像一个静态的数据帧。它看起来像是它一直在做的所有合并的一条通路,有效地减缓了工作的线性。
我有权利承担var df
是这个的罪魁祸首吗?
击穿的问题,因为我看到它:
dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....
当我希望DF” AC和d为对象,不同的火花的东西,如果我坚持不关心或是否重新分配。 星火它看起来像这样:
dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....
UPDATE2
为了摆脱持续的DF的我能“破发”的血统转换DF并RDD,然后再返回时,无法正常工作。 这有一点额外的开销,但是可以接受的(工作在几分钟内完成,而不是几个小时/从不) 我将对持久性进行一些更多的测试,并以解决方法的形式制定答案。
结果: 这似乎只能解决这些问题的表面。在现实中我又回到了起点,并得到OOM异常java.lang.OutOfMemoryError: GC overhead limit exceeded
我不清楚这个'mergeWith'函数应该做什么(你同时编写union和join)。你能否包含'mergeWith'的代码? –
'mergeWith'函数可以是很多事情,当它只是一个'union all'时,我会得到很好的结果。或者它可能是这样的: 'SELECT f。* FROM full f LEFT OUTER JOIN delta delta ON CONCAT(fa,fb)= CONCAT(ia,iz)WHERE CONCAT(ia,iz)is NULL UNION ALL SELECT d。* FROM delta d' – Havnar
做一个'union'和做'join'有很大的区别。对于'union',Spark只需要写入附加的数据,而对于'join'则必须将数据混洗。根据你的数据的大小,当然做一个'join'可能很容易出现OOM异常 - 特别是因为你正在运行在一个很小的群集上。 –