0
我在Spark(v2.1.1)中有一个包含分层数据的3列(如下所示)的数据集。Apache Spark中的分层数据处理
- 我的目标的目标是增量编号分配给基础上,父子层次的每一行。从图形上可以说,分层数据是一个树的集合。
- 根据下表,我已经有基于'Global_ID'分组的行。现在我想以 的增量顺序生成'Value'列,但是基于 'Parent'和'Child'列的数据层次结构。
表格表示(数值是所需的输出):
+-----------+--------+-------+ +-----------+--------+-------+-------+
| Current Dataset | | Desired Dataset (Output) |
+-----------+--------+-------+ +-----------+--------+-------+-------+
| Global_ID | Parent | Child | | Global_ID | Parent | Child | Value |
+-----------+--------+-------+ +-----------+--------+-------+-------+
| 111 | 111 | 123 | | 111 | 111 | 111 | 1 |
| 111 | 135 | 246 | | 111 | 111 | 123 | 2 |
| 111 | 123 | 456 | | 111 | 123 | 789 | 3 |
| 111 | 123 | 789 | | 111 | 123 | 456 | 4 |
| 111 | 111 | 111 | | 111 | 111 | 135 | 5 |
| 111 | 135 | 468 | | 111 | 135 | 246 | 6 |
| 111 | 135 | 268 | | 111 | 135 | 468 | 7 |
| 111 | 268 | 321 | | 111 | 135 | 268 | 8 |
| 111 | 138 | 139 | | 111 | 268 | 321 | 9 |
| 111 | 111 | 135 | | 111 | 111 | 138 | 10 |
| 111 | 111 | 138 | | 111 | 138 | 139 | 11 |
| 222 | 222 | 654 | | 222 | 222 | 222 | 12 |
| 222 | 654 | 721 | | 222 | 222 | 987 | 13 |
| 222 | 222 | 222 | | 222 | 222 | 654 | 14 |
| 222 | 721 | 127 | | 222 | 654 | 721 | 15 |
| 222 | 222 | 987 | | 222 | 721 | 127 | 16 |
| 333 | 333 | 398 | | 333 | 333 | 333 | 17 |
| 333 | 333 | 498 | | 333 | 333 | 398 | 18 |
| 333 | 333 | 333 | | 333 | 333 | 498 | 19 |
| 333 | 333 | 598 | | 333 | 333 | 598 | 20 |
+-----------+--------+-------+ +-----------+--------+-------+-------+
树表示(期望值旁边的每个节点表示):
+-----+ +-----+
1 | 111 | 17 | 333 |
+--+--+ +--+--+
| |
+---------------+--------+-----------------+ +----------+----------+
| | | | | |
+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+
2 | 123 | 5 | 135 | 10 | 138 | | 398 | | 498 | | 598 |
+--+--+ +--+--+ +--+--+ +--+--+ +--+--+ +--+--+
+-----+-----+ +--------+--------+ | 18 19 20
| | | | | |
+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+
| 789 | | 456 | | 246 | | 468 | | 268 | | 139 | +-----+
+-----+ +-----+ +-----+ +-----+ +--+--+ +-----+ 12 | 222 |
3 4 6 7 8 | 11 +--+--+
+--v--+ |
| 321 | +------+-------+
+--+--+ | |
9 +--v--+ +--v--+
13 | 987 | 14 | 654 |
+--+--+ +--+--+
|
+--v--+
15 | 721 |
+--+--+
|
+--v--+
16 | 127 |
+--+--+
代码段:
Dataset<Row> myDataset = spark
.sql("select Global_ID, Parent, Child from RECORDS");
JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
.agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
.orderBy(new Column("Global_ID"))
.withColumn("vars", functions.explode(<Spark UDF>)
.select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
.javaRDD().zipWithIndex();
// Sample UDF (TODO: Actual Implementation)
spark.udf().register("computeValue",
(<Column Names>) -> <functionality & implementation>,
DataTypes.<xxx>);
经过大量的调查研究,并通过博客,许多建议去,我曾尝试下面的方法,但无济于事,以实现我的方案的结果。
技术堆栈:
Apache的火花(V2.1。1)
爪哇8
AWS EMR集群(火花应用部署)
数据量:
- 大约〜Dataset中
20000000点方法下的行尝试:
星火GraphX + GraphFrames:
- 使用这种组合,我只能实现顶点和边之间的关系,但它不适合我的用例。
参考:https://graphframes.github.io/user-guide.html星火GraphX预凝胶API:
- 这是最接近我能得到实现预期的结果,但不幸的是我无法找到一个Java代码片断一样。 在其中一个博客中提供的示例是Scala,我不熟悉 。
参考:https://dzone.com/articles/processing-hierarchical-data-using-spark-graphx-pr
替代品的任何建议(或)在当前的方法修改将是很有益的,因为我搞清楚这个用例的解决方案完全丢失。
感谢您的帮助!谢谢!
这似乎是一个非常可行的解决方案。感谢您的代码!我肯定会试试这个,但只是有点关注'收藏列表',因为我的数据大约在2000万行左右,但它应该是好的。如果您可以提供与Scala代码完全相同的Java代码,那就太好了,因为我是Scala的新手。再次感谢! – Sridher
@Sridher我认为你可以很容易地将其转换为java代码。这里主要要注意的是你可以在java中复制的spark代码。 –
我将大部分代码转换为与Java相当的代码,但是面临着您使用过的Spark UDF的一些问题。你能帮我解决UDF吗?请参阅我编辑过的代码片段。 – Sridher