2017-06-21 59 views
1

我有以下结构化查询:将持续数据帧重复计算多次?

val A = 'load somedata from HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER) 
val B = A.filter('condition 1') 
val C = A.filter('condition 2') 
val D = A.filter('condition 3') 
val E = A.filter('condition 4') 
val F = A.filter('condition 5') 
val G = A.filter('condition 6') 
val H = A.filter('condition 7') 

val I = B.union(C).union(D).union(E).union(F).union(G).union(H) 

我坚持数据帧A,使得当我使用B/C/d/E/F/G/H时,A数据帧应当只计算一旦?但是,这项工作的DAG低于:

enter image description here

从上面的DAG,似乎阶段6-12的所有执行和数据帧A被计算的7倍?

为什么会发生这种情况?

也许DAG只是假的?我发现阶段7-12的顶部没有行,其中阶段6确实有两行来自其他阶段

我没有列出所有的操作。在union操作之后,我将I数据帧保存到HDFS。对I数据帧执行此操作会使持久操作真的完成吗?或者我必须在A数据帧上执行动作操作(例如count)以在重新使用A数据帧之前触发持久操作?

回答

2

做下面的行不会持续你的数据集。与数据集API一起使用时,所以你必须使用触发操作count或类似的,反过来提交星火工作缓存

val A = 'load somedata from HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER) 

缓存/持久性是懒惰。

以下所有运营商后,filter包括,应使用InMemoryTableScan在计划中的绿点(如下图所示)。

enter image description here

在你的情况,甚至union后的数据集I没有被缓存,因为你还没有触发缓存(而仅仅是将其标记为缓存)。

联合操作后,我将I数据帧保存到HDFS。对I数据帧执行此操作会使持久操作真的完成吗?

是的。只有操作(如保存到外部存储)才能触发持久性以供将来重用。

或者我必须做一个动作操作,如计数在A数据帧触发持久化操作,然后再使用A数据帧?

这就是重点!在你的情况下,因为你想在filter运营商中重复使用A数据帧,你应该首先persistcount(触发缓存),然后是filter

在你的情况下,没有filter将受益于persist的性能增加。persist实际上对性能没有任何影响,只是让代码审查人员认为它不是。

如果您想查看您的数据集何时被缓存,您可以在网络用户界面中检出存储选项卡或询问CacheManager

val nums = spark.range(5).cache 
nums.count 
scala> spark.sharedState.cacheManager.lookupCachedData(nums) 
res0: Option[org.apache.spark.sql.execution.CachedData] = 
Some(CachedData(Range (0, 5, step=1, splits=Some(8)) 
,InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
    +- *Range (0, 5, step=1, splits=8) 
)) 
+0

数据框是否像维护数据库一样维护数据行? – BDR