2017-02-17 137 views
1

在spark中,我希望能够并行处理多个数据帧。你可以在另一个Dataframe中嵌套Spark Dataframe吗?

我想要的方法是将数据框嵌套在父数据框中,但我不确定该语法或者是否可能。

例如我有以下2个dataframes: DF1:

+-----------+---------+--------------------+------+ 
|id   |asset_id |    date| text| 
+-----------+---------+--------------------+------+ 
|20160629025|  A1|2016-06-30 11:41:...|aaa...| 
|20160423007|  A1|2016-04-23 19:40:...|bbb...| 
|20160312012|  A2|2016-03-12 19:41:...|ccc...| 
|20160617006|  A2|2016-06-17 10:36:...|ddd...| 
|20160624001|  A2|2016-06-24 04:39:...|eee...| 

DF2:

+--------+--------------------+--------------+ 
|asset_id|  best_date_time| Other_fields| 
+--------+--------------------+--------------+ 
|  A1|2016-09-28 11:33:...|   abc| 
|  A1|2016-06-24 00:00:...|   edf| 
|  A1|2016-08-12 00:00:...|   hij| 
|  A2|2016-07-01 00:00:...|   klm| 
|  A2|2016-07-10 00:00:...|   nop| 

所以我想结合这产生这样的事情。

+--------+--------------------+-------------------+ 
|asset_id|     df1|    df2| 
+--------+--------------------+-------------------+ 
|  A1| [df1 - rows for A1]|[df2 - rows for A1]| 
|  A2| [df1 - rows for A2]|[df2 - rows for A2]| 

注意,我不希望加入或工会它们因为这将是非常稀疏的(其实我有大约30 dataframes,数千资产的每个几千行)。

然后我打算做这个groupByKey让我得到这样的事情,我可以调用一个函数:

[('A1', <pyspark.resultiterable.ResultIterable object at 0x2534310>), ('A2', <pyspark.resultiterable.ResultIterable object at 0x25d2310>)] 

我是新来的火花,从而不胜感激任何帮助。

回答

2

TL; DR无法嵌套DataFrames,但可以使用复杂类型。

在这种情况下,你可以例如(星火2.0或更高版本):

from pyspark.sql.functions import collect_list, struct 

df1_grouped = (df1 
    .groupBy("asset_id") 
    .agg(collect_list(struct("id", "date", "text")))) 

df2_grouped = (df2 
    .groupBy("asset_id") 
    .agg(collect_list(struct("best_date_time", "Other_fields")))) 

df1_grouped.join(df2_grouped, ["asset_id"], "fullouter") 

,但你必须知道:

  • 这是相当昂贵的。
  • 它具有有限的应用程序。一般来说,嵌套结构使用起来很麻烦,而且需要复杂而昂贵的(特别是在PySpark中)UDF。
+0

感谢您的有用指针。 – prk