2016-01-21 85 views
9

我已经创建了两个来自Hive表(PC_ITM和ITEM_SELL)并且大小很大的数据帧,并且我经常在SQL查询中使用那些 通过注册为表。但由于那些大,因此我花了很多时间获取查询结果。因此,我将它们保存为parquet文件,然后将它们读取并注册为临时表。但仍然得不到良好性能,因此我已广播了这些数据帧,然后注册了它们如下表所示。如何访问Spark中的广播数据帧

PC_ITM_DF=sqlContext.parquetFile("path") 
val PC_ITM_BC=sc.broadcast(PC_ITM_DF) 
val PC_ITM_DF1=PC_ITM_BC 
PC_ITM_DF1.registerAsTempTable("PC_ITM") 

ITM_SELL_DF=sqlContext.parquetFile("path") 
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF) 
val ITM_SELL_DF1=ITM_SELL_BC.value 
ITM_SELL_DF1.registerAsTempTable(ITM_SELL) 


sqlContext.sql("JOIN Query").show 

但是我还是无法实现它与这些数据帧未被广播的时间相同的性能。

谁能告诉如果这是广播和使用它的正确方法?`

回答

0

我会缓存RDDS在内存中。下一次需要时,spark会从内存中读取RDD,而不是每次都从头开始生成RDD。这里有一个快速入门的链接docs

val PC_ITM_DF = sqlContext.parquetFile("path") 
PC_ITM_DF.cache() 
PC_ITM_DF.registerAsTempTable("PC_ITM") 

val ITM_SELL_DF=sqlContext.parquetFile("path") 
ITM_SELL_DF.cache() 
ITM_SELL_DF.registerAsTempTable("ITM_SELL") 
sqlContext.sql("JOIN Query").show 

rdd.cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的简写。有几种持久性级别可以选择,因为您的数据太大而无法使用内存持久性。这里是一个list of persistence options.如果你想从缓存中手动删除RDD,你可以拨打rdd.unpersist()

如果您喜欢广播数据。在播放之前,您必须先将其收集在驱动程序中。这要求您的RDD适合您的驱动程序(和执行程序)的内存。

+1

这不回答原来的问题,这就是如何广播数据帧。如果您不止一次加载它,即重复使用,坚持只会有所帮助。加入两个分布式数据集时无助于此。 –

+0

@KirkBroadhurst他指出数据很大并且经常使用 –

+0

@AlexNaspo稍微偏离原始问题:'RDD适合内存',因此这意味着我无法播放数据,直到我可以将其收集到驱动程序的主内存中?我通常使用自己的笔记本电脑作为大型集群上的驱动程序和主设备/从设备。那么这是我很快可能面临的限制吗? –

10

你并不需要'访问'广播数据帧 - 你只是使用它,而Spark将在后台实现广播。 broadcast function很好地工作,并且更有意义的是sc.broadcast方法。

如果您一次评估所有内容,可能很难理解在哪里花费时间。

您可以将代码分解为多个步骤。这里的关键是执行一个动作,并在您将用于连接之前继续使用您想要广播的数据帧

// load your dataframe 
PC_ITM_DF=sqlContext.parquetFile("path") 

// mark this dataframe to be stored in memory once evaluated 
PC_ITM_DF.persist() 

// mark this dataframe to be broadcast 
broadcast(PC_ITM_DF) 

// perform an action to force the evaluation 
PC_ITM_DF.count() 

这样做将确保数据帧是

  • 加载到内存中(持续)
  • 注册为临时表用于您的SQL查询
  • 标记为广播,这样会运送给所有执行者

当您现在运行sqlContext.sql("JOIN Query").show时,您应该看到'broadc ast hash join“放在Spark UI的SQL选项卡中。

+1

广播RDD有什么好处? RDD代表弹性分布式数据集。广播消除了RDD的分布式特性。我可以看到将RDD的数据收集到内存中并进行广播的用例。我不相信这是可能的。如果你看看这篇文章(http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love它说:“要广播一个RDD,你需要首先在驱动程序节点上收集()它。”你有没有在实践或测试中使用过这个? –

+4

@AlexNaspo是的,我一直都在使用它。好处是数据在所有节点上完全可用 - 不再分配 - 这可以提高加入时的性能。例如,考虑一个DataFrame,其中包含美国的每个人及其邮政编码,然后是包含邮政编码 - >州的表。加入这些需要大量的洗牌。将相对较小的zip->状态数据帧广播到所有节点,无需洗牌。 –

+1

您正在广播保存在内存中的数据帧,而不是分布式的。那是对的吗? Spark建议在您的数据中添加一个分区器以减少加入时的随机数量。 @kirkbroadhurt –

0

此时您无法访问SQL查询中的广播数据帧。您只能通过数据帧使用已展开的数据帧。

参见:https://issues.apache.org/jira/browse/SPARK-16475

+0

现在的解决方案是首先在dataframe api中广播df或表,然后注册从'broadcast'返回的值函数作为临时表,然后在SQL查询中调用该临时表。 – piggybox