2016-11-09 42 views

回答

1

你可以用一点SQL来实现这一点。

让我们假设你有如下图:

import org.apache.spark.graphx 
import org.apache.spark.rdd.RDD 

// Create an RDD for the vertices 
val v: RDD[(VertexId, (String))] = 
    sc.parallelize(Array((1L, ("car1")), (2L, ("car2")), 
         (3L, ("car3")), (4L, ("person1")),(5L, ("person2")))) 
// Create an RDD for edges 
val e: RDD[Edge[Int]] = 
    sc.parallelize(Array(Edge(4L, 1L,1), Edge(4L, 2L, 1), 
         Edge(5L, 1L,1))) 


val graph = Graph(v,e) 

现在边和顶点提取到Dataframes:

val vDf = graph.vertices.toDF("vId","vName") 
val eDf =graph.edges.toDF("person","car","attr") 

数据转换为所需的输出

eDf.drop("attr").join(vDf,'person === 'vId).drop("vId","person").withColumnRenamed("vName","person") 
.join(vDf,'car === 'vId).drop("car","vId") 
.groupBy("person") 
.agg(collect_set('vName)).toDF("person","car") 
.show() 


+-------+------------+ 
| person|   car| 
+-------+------------+ 
|person2|  [car1]| 
|person1|[car2, car1]| 
+-------+------------+ 
相关问题