2016-05-17 79 views
2

我想检查一个新图(称为A)是否是其他图(称为B)的子图。我写了一个测试的小演示,但失败了!我跑只是火花外壳演示,火花版本1.6.1:如何使用Spark图形函数遮罩?

// Build the GraphB 
val usersB = sc.parallelize(Array(
    (3L, ("rxin", "student")), 
    (7L, ("jgonzal","postdoc")), 
    (5L, ("franklin", "prof")), 
    (2L, ("istoica", "prof")) 
)) 

val relationshipsB = sc.parallelize(Array(
    Edge(3L, 7L, "collab"), 
    Edge(5L, 3L, "advisor"), 
    Edge(2L, 5L, "colleague"), 
    Edge(5L, 7L, "pi") 
)) 

val defaultUser = ("John Doe", "Missing") 

val graphB = Graph(usersB, relationshipsB, defaultUser) 

// Build the initial Graph A 
val usersA = sc.parallelize(Array(
    (3L, ("rxin", "student")), 
    (7L, ("jgonzal", "postdoc")), 
    (5L, ("franklin", "prof")) 
)) 

val relationshipsA = sc.parallelize(Array(
    Edge(3L, 7L, "collab"), 
    Edge(5L, 3L, "advisor") 
)) 

val testGraphA = Graph(usersA, relationshipsA, defaultUser) 

//do the mask 
val maskResult = testGraphA.mask(graphB) 
maskResult.edges.count 
maskResult.vertices.count 

在我的理解API on spark website,屏蔽功能可按能得到所有相同的边和顶点。然而,结果是顶点只是正确的(maskResult.vertices.count = 3),边数应该是2但不是(maskResult.edges.count = 0)。

回答

2

如果你去看看the source,你会看到mask使用EdgeRDD.innerJoin。如果您在the documentation去找innerJoin,你会看到警告:

内加入本EdgeRDD与另一EdgeRDD,假设两个使用相同PartitionStrategy分区

您将需要创建并使用PartitionStrategy。如果你做到以下几点,它会得到你想要的结果(但可能不会很好地进行缩放):

object MyPartStrat extends PartitionStrategy { 
    override def getPartition(s: VertexId, d: VertexId, n: PartitionID) : PartitionID = { 
    1  // this is just to prove the point, you'll need a real partition strategy 
    } 
} 

然后,如果你这样做:

val maskResult = testGraphA.partitionBy(MyPartStrat).mask(graphB.partitionBy(MyPartStrat)) 

你会得到你想要的结果。但就像我说的,你可能需要找出一个更好的分区策略,而不是把所有东西都塞进一个分区。

+0

不错的答案。我只想补充一点,他可以选择可以在[这里]找到的预先打包的分区策略之一(http://spark.apache.org/docs/1.5.1/api/scala/index.html#org。 apache.spark.graphx.PartitionStrategy $)。所以,也许他不需要真正创建一个,他可以像'testGraphA.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)' –

+1

好,稍后会添加到我的答案 –