2017-01-23 70 views
1

我是新来的火花和尝试学习。这是一个相当简单的问题,我有下面的代码来减少重复键w.r.t到他们的值。如何在Apache Spark中执行简单的reduceByKey?

数据帧都会有这样的价值观。

subject  object  

    node1  node5 
    node1  node6 
    node1  node7 
    node2  node5 
    node2  node7 

而且我希望他们能像这样减少。

subject  object  

    node1  [node5,node6,node7] 
    node2  [node5,node7] 

我能实现这个使用groupByKey方法,但我想在这里使用reduceByKey对此我无法理解什么是执行这一正确的语法。

这里是我的代码:

DataFrame records = Service.sqlCtx().sql("SELECT subject,object FROM Graph"); 


    JavaPairRDD<String,Iterable<String>> rows = records.select("subject","object").toJavaRDD().mapToPair(
      new PairFunction<Row,String,String>(){ 

       @Override 
       public Tuple2<String, String> call(Row row) throws Exception { 
        return new Tuple2<String, String>(row.getString(0), row.getString(1)); 
       } 

      // this can be optimized if we use reduceByKey instead of groupByKey 
    }).distinct().groupByKey().cache(); 

回答

0
  • 在一般情况下,这不能与reduceByKey优化。效率低下的部分是操作不是特定的实现。
  • 此外,这不能直接与reduceByKey实现由于不相容签名。这可以通过aggregateByKeycombineByKey完成,但它仍然不是优化。
  • 最后,如果你使用DataFrames只需使用collect_list

    import static org.apache.spark.sql.functions.*; 
    
    records.groupBy("subject").agg(collect_list(col("object"))); 
    
-1

有我们可以应用reduceByKey优化的方式,但我们必须改造1前reduceByKey。

val keyValuePairs = sc.parallelize(List(("node1","node5"),("node1","node6"),("node1","node7"),("node2","node5"),("node2","node7"))) //Input 

val mappedKV = keyValuePairs.map(x => (x._1,Seq(x._2))) 

//Transform each value of the K,V pair to 'Seq' (extra transformation) 

val reducedKV = mappedKV.reduceByKey(_++_) 

然后施加 '++' 与reduceByKey。

输出:

阶> reducedKV.collect

数组[(字符串,SEQ [字符串])] =阵列((节点2,列表(节点5,node7)),(节点1,列表(节点5,node6,node7)))

相关问题