2017-07-14 73 views
0

我正在使用Spark实现以下逻辑。将数据帧拆分成更小的数据帧并将大数据帧推送给所有执行者?

  1. 获取50K行表的结果。
  2. 获取另一张表格(约30K行)。
  3. 对于(1)和(2)之间的所有组合,做一些工作并获得价值。

将(2)的数据帧推送到所有执行程序和分区(1)并在每个执行程序上运行每个部分?如何实现它?

val getTable(t String) = 
    sqlContext.read.format("jdbc").options(Map(
    "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
    "url" -> jdbcSqlConn, 
    "dbtable" -> s"$t" 
)).load() 
    .select("col1", "col2", "col3") 

val table1 = getTable("table1") 
val table2 = getTable("table2") 

// Split the rows in table1 and make N, say 32, data frames 
val partitionedTable1 : List[DataSet[Row]] = splitToSmallerDFs(table1, 32) // How to implement it? 

val result = partitionedTable1.map(x => { 
    val value = doWork(x, table2) // Is it good to send table2 to executors like this? 
    value 
}) 

问:

  1. 如何破解大数据帧分成小的数据帧? (重新分区?)
  2. 向这样的执行器发送table2(传递一个大数据帧作为参数)是否很好?

回答

1

如何破解大数据帧分成小的数据帧? (再分配?)

简单的答案是肯定的repartion可以是一个解决办法。

问题可能是,将数据帧重新分区到更小的分区改善整体操作?

数据帧已经在本质上分布了。这意味着您在数据框上执行的操作(如连接,groupBy,聚合,函数等等)都在数据所在的位置执行。但如加入操作,GROUPBY,在需要洗牌聚集,重新分区将是无效的

  1. GROUPBY操作将打乱数据帧,使得不同的群体将是相同的执行人。

  2. partitionBy在窗口函数执行方式GROUPBY

  3. 联接操作将在相同的方式混洗数据相同。

是好送表2(通过一个大的数据帧作为参数)这样的执行者?

它不是很好地传递数据帧。由于您正在传输数据帧,所以table2对执行者不可见。

我会建议你使用broadcast variable

你可以做如下

val table2 = sparkContext.broadcast(getTable("table2")) 
val result = partitionedTable1.map(x => { 
    val value = doWork(x, table2.value) 
    value 
}) 
+0

感谢。你的意思是'val result = table1.map(x => {'(not'partitionedTable1')在你的答案中代码的第二行?由于数据框会默认分发给所有执行者?不需要手动分割它 – ca9163d9

+0

nope。那不是我的意思,我建议使用dataframe作为广播变量,并在其他函数中访问它,而不是将数据帧作为参数传递。 –

+0

我需要'splitToSmallerDFs()'来将'table1'拆分为'partitionedTable1:List [DataSet [Row]]'?是否将'table1.map(...)'的执行分配给所有执行者? – ca9163d9