2017-12-27 318 views
0

我在Spark(v2.1.1)中有一个包含分层数据的3列(如下所示)的数据集。Apache Spark中的分层数据处理

  • 我的目标的目标是增量编号分配给基础上,父子层次的每一行。从图形上可以说,分层数据是一个树的集合。
  • 根据下表,我已经有基于'Global_ID'分组的行。现在我想以 的增量顺序生成'Value'列,但是基于 'Parent'和'Child'列的数据层次结构。

表格表示(数值是所需的输出):

+-----------+--------+-------+   +-----------+--------+-------+-------+ 
    |  Current Dataset  |   |  Desired Dataset (Output)  | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 
    | Global_ID | Parent | Child |   | Global_ID | Parent | Child | Value | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 
    |  111 | 111 | 123 |   |  111 | 111 | 111 |  1 | 
    |  111 | 135 | 246 |   |  111 | 111 | 123 |  2 | 
    |  111 | 123 | 456 |   |  111 | 123 | 789 |  3 | 
    |  111 | 123 | 789 |   |  111 | 123 | 456 |  4 | 
    |  111 | 111 | 111 |   |  111 | 111 | 135 |  5 | 
    |  111 | 135 | 468 |   |  111 | 135 | 246 |  6 | 
    |  111 | 135 | 268 |   |  111 | 135 | 468 |  7 | 
    |  111 | 268 | 321 |   |  111 | 135 | 268 |  8 | 
    |  111 | 138 | 139 |   |  111 | 268 | 321 |  9 | 
    |  111 | 111 | 135 |   |  111 | 111 | 138 | 10 | 
    |  111 | 111 | 138 |   |  111 | 138 | 139 | 11 | 
    |  222 | 222 | 654 |   |  222 | 222 | 222 | 12 | 
    |  222 | 654 | 721 |   |  222 | 222 | 987 | 13 | 
    |  222 | 222 | 222 |   |  222 | 222 | 654 | 14 | 
    |  222 | 721 | 127 |   |  222 | 654 | 721 | 15 | 
    |  222 | 222 | 987 |   |  222 | 721 | 127 | 16 | 
    |  333 | 333 | 398 |   |  333 | 333 | 333 | 17 | 
    |  333 | 333 | 498 |   |  333 | 333 | 398 | 18 | 
    |  333 | 333 | 333 |   |  333 | 333 | 498 | 19 | 
    |  333 | 333 | 598 |   |  333 | 333 | 598 | 20 | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 

树表示(期望值旁边的每个节点表示):

     +-----+           +-----+ 
        1 | 111 |          17 | 333 | 
         +--+--+           +--+--+ 
         |             | 
     +---------------+--------+-----------------+   +----------+----------+ 
     |      |     |   |   |   | 
     +--v--+     +--v--+   +--v--+  +--v--+ +--v--+ +--v--+ 
    2 | 123 |    5 | 135 |  10 | 138 |  | 398 | | 498 | | 598 | 
     +--+--+     +--+--+   +--+--+  +--+--+ +--+--+ +--+--+ 
    +-----+-----+   +--------+--------+  |   18   19   20 
    |   |   |  |  |  | 
+--v--+  +--v--+ +--v--+ +--v--+ +--v--+ +--v--+ 
| 789 |  | 456 | | 246 | | 468 | | 268 | | 139 |     +-----+ 
+-----+  +-----+ +-----+ +-----+ +--+--+ +-----+    12 | 222 | 
    3   4   6  7  8 |  11     +--+--+ 
             +--v--+        | 
             | 321 |      +------+-------+ 
             +--+--+      |    | 
              9      +--v--+  +--v--+ 
                   13 | 987 | 14 | 654 | 
                    +--+--+  +--+--+ 
                        | 
                       +--v--+ 
                      15 | 721 | 
                       +--+--+ 
                        | 
                       +--v--+ 
                      16 | 127 | 
                       +--+--+ 

代码段:

Dataset<Row> myDataset = spark 
       .sql("select Global_ID, Parent, Child from RECORDS"); 

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID")) 
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))), 
     functions.sort_array(functions.collect_list(new Column("Child").as("child_col")))) 
    .orderBy(new Column("Global_ID")) 
    .withColumn("vars", functions.explode(<Spark UDF>) 
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col")) 
    .javaRDD().zipWithIndex(); 


// Sample UDF (TODO: Actual Implementation) 
spark.udf().register("computeValue", 
       (<Column Names>) -> <functionality & implementation>, 
       DataTypes.<xxx>); 

经过大量的调查研究,并通过博客,许多建议去,我曾尝试下面的方法,但无济于事,以实现我的方案的结果。

技术堆栈:

  • Apache的火花(V2.1。1)

  • 爪哇8

  • AWS EMR集群(火花应用部署)


数据量:

  • 大约〜Dataset中

20000000点方法下的行尝试:

  1. 星火GraphX + GraphFrames:

  2. 星火GraphX预凝胶API:


替代品的任何建议(或)在当前的方法修改将是很有益的,因为我搞清楚这个用例的解决方案完全丢失。

感谢您的帮助!谢谢!

回答

0

注意:下面的解决方案是scala spark。您可以轻松转换为Java代码。

检查了这一点。我试着用Spark Sql来做这件事,你可以得到一个想法。基本上的想法是在对它们进行聚合和分组的同时对孩子,父母和全球身份进行排序。一旦按globalid进行分组和排序,则展开其余部分。你会得到有序的结果表到以后你可以zipWithIndex添加等级(值)

import org.apache.spark.sql.SQLContext 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.expressions.UserDefinedFunction 
    import org.apache.spark.sql.functions.udf 

    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598)) 
    val ddd = sc.parallelize(t).toDF 
    val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys) 
    val dd1 = ddd 
    .groupBy($"_1") 
    .agg(sort_array(collect_list($"_2")).as("v"), 
     sort_array(collect_list($"_3")).as("w")) 
    .orderBy(asc("_1")) 
    .withColumn("vars", explode(zip($"v", $"w"))) 
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex 

    dd1.collect 

输出

res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2), 
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9)) 
+0

这似乎是一个非常可行的解决方案。感谢您的代码!我肯定会试试这个,但只是有点关注'收藏列表',因为我的数据大约在2000万行左右,但它应该是好的。如果您可以提供与Scala代码完全相同的Java代码,那就太好了,因为我是Scala的新手。再次感谢! – Sridher

+0

@Sridher我认为你可以很容易地将其转换为java代码。这里主要要注意的是你可以在java中复制的spark代码。 –

+0

我将大部分代码转换为与Java相当的代码,但是面临着您使用过的Spark UDF的一些问题。你能帮我解决UDF吗?请参阅我编辑过的代码片段。 – Sridher