2016-11-23 77 views
0

我想计算图中每个节点的平均邻居度。说,我们有这样的曲线图:如何用GraphX计算邻居的平均度

val users: RDD[(VertexId, String)] = 
     sc.parallelize(Array((3L, "rxin"), 
           (7L, "jgonzal"), 
           (5L, "franklin"), 
           (2L, "istoica"))) 
// Create an RDD for edges 
val relationships: RDD[Edge[Int]] = sc.parallelize(
        Array(Edge(3L, 7L, 12), 
          Edge(5L, 3L, 1), 
          Edge(2L, 5L, 3), 
          Edge(5L, 7L, 5))) 
// Build the initial Graph 
val graph = Graph(users, relationships) 

EDIT 为了有成果的一个理念,以节点5和它的邻居:

  • 节点3具有度= 2
  • 具有度数= 2的节点7
  • 具有度数= 1的节点2

该度量的输出只是节点5的邻居的平均度数:(2 + 2 + 1)/ 3 = 1.666

理想情况下,您希望在此计算中删除与节点5的链接, “吨真的重要,我现在......

编辑完

我想申请aggregateMessages,但我不知道如何检索每个节点的学位,而我到aggregateMessages调用:

val neideg = g.aggregateMessages[(Long, Double)](
    triplet => { 
     val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr) // BUT HERE I SHOULD GIVE ALSO THE DEGREE 
     triplet.sendToDst(1L, comparedAttrs) 
     triplet.sendToSrc(1L, comparedAttrs) 
    }, 
    { case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) }) 

val aveneideg = neideg.mapValues(kv => kv._2/kv._1.toDouble).toDF("id", "aveneideg") 

然后我有不和的函数:

def compareAttrs(xs: (Int, String), ys: (Int, String)): Double = { 
    xs._1.toDouble + ys._1.toDouble 
} 

如何传递到comparedAttrs也学位的价值为那些节点?

当然更乐意看到这个任务更简单,更智能的解决方案相比,一个我试图手艺......

+0

我会为每个连接的组件做DFS,并跟踪沿途的邻居。然后将该数字除以节点的数量。 – Xenwar

+0

感谢您的时间,但这种回复没有真正的帮助 – user299791

回答

1

我不清楚,如果这就是你以后,不过这是后话,你可以一起去:

val degrees = graph.degrees 
// now we have a graph where attribute is a degree of a vertex 
val graphWithDegrees = graph.outerJoinVertices(degrees) { (_, _, optDegree) => 
    optDegree.getOrElse(1)  
} 

// now each vertex sends its degree to its neighbours 
// we aggregate them in a set where each vertex gets all values 
// of its neighbours 
val neighboursDegreeAndCount = graphWithDegrees.aggregateMessages[List[Long]](
    sendMsg = triplet => { 
     val srcDegree = triplet.srcAttr 
     val dstDegree = triplet.dstAttr 
     triplet.sendToDst(List(srcDegree)) 
     triplet.sendToSrc(List(dstDegree)) 
    }, 
    mergeMsg = (x, y) => x ++ y 
).mapValues(degrees => degrees.sum/degrees.size.toDouble) 

// now if you want it in the original graph you can do 
// outerJoinVertices again, and now the attr of vertex 
// in the graph is avg of its neighbours 
graph.outerJoinVertices(neighboursDegreeAndCount) { (_, _, optAvgDegree) => 
    optAvgDegree.getOrElse(1) 
} 

因此,对于你的例子输出为:Array((5,1.6666666666666667), (2,3.0), (3,2.5), (7,2.5))

+0

我不明白节点5的结果,为什么平均度是1.5而不是1.666?谢谢! – user299791

+0

@Ipiepiora我编辑了这个问题,以更好地解释我想要完成的事情 – user299791

+1

@ user299791对不起,这是我的代码中的一个愚蠢的错误。我在'List'中使用了'Set'。再次检查代码。 – lpiepiora