2016-11-17 99 views
4

给定一个文件,其中填充了诸如在火花中组合/连接ID行

i1, i2, i5 
i3, i4 
i2, i6, i7 
i4, i8 
i9, i3 

你如何通过链接相同的ID加入他们?因此,对于上面的示例,行1通过i2链接到行3,行2分别通过i4和i3链接到行4和5。这会给你以下(重复删除)

i1, i2, i5, i6, i7 
i3, i4, i8, i9 

我可以通过行循环做到这一点,但不知道你将如何去它以功能性方式?

+2

类似:HTTP://计算器。com/questions/40240409/apache-spark-rdd-substitution/40256149#40256149 – maasg

回答

1

当你使用Apache星火,你可以使用内置的GraphX组件做的工作适合你。

import org.apache.spark.graphx._ 

def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y) 

val data = sc.parallelize(List(
    "1\t5\t3", 
    "3\t9\t30", 
    "7\t10\t12", 
    "10\t7\t13" 
)) 

val prep = data.map(x => x.split("\t").map(_.toLong).toList) 

val vertex = prep 
    .flatMap(x => x) 
    .map(x => x -> s"ID=$x") 

val edges = prep 
    .map(x => cross(x, x)) 
    .flatMap(x => x) 
    .map(x => new Edge(x._1, x._2, "likes")) 

val graph = Graph(vertex, edges) 
val linked = graph 
    .connectedComponents 
    .vertices 
    .map(_.swap) 
    .groupByKey 

linked.take(10).foreach(println) 

会打印出如下结果:

(1,CompactBuffer(30, 3, 9, 1, 5)) 
(7,CompactBuffer(7, 10, 12, 13)) 

十字简单地创建两个列表的交叉产品,以便我们可以创建所有顶点之间的边缘。

connectedComponents函数将遍历图并找出所有共享边的顶点并创建一个新图,其中每个顶点都是顶点Id - >“主”顶点ID的元组。

所以:

graph.connectedComponents.vertices.take(10).foreach(println) 

将打印出来

(30,1) 
(1,1) 
(3,1) 
(5,1) 
(7,7) 
(9,1) 
(10,7) 
(12,7) 
(13,7) 

正如你可以看到1和7已被选定为“主顶点”,并链接到所有连接的顶点在第一图。所以一个简单的交换和组合将把所有连接的ID组合在一起。

与星火工程
0

而不是使用O(n * n)解决方案,通过所有我们可以使用一个解决方案,它是O(n * k)其中ķ是你的ID号行的循环。像这样:

val input = ...//I will assume your input is an RDD[List] 

val idArray = Array(id1, id2, id3, id4, id5, id6, id6)//Array containing all IDs 
val result = sc.parallelize(idArray, k).map(x => (x, x)) 
input = input.map(x => (x(0), if(x.length > 0) x.slice(1, x.length) else null)) 

//If you can afford to persist it would help greatly: 
result.persist 
input.persist 

//We can make this loop smaller if k is large and your lists are small 
//by setting the upper bound of the range to the length of the longest list. 
//I'll leave this decision up to you. 
for (i <- 0 to k){ 
    result = result.cogroup(input) 
    input = input.map((t: (x, y)) => (y(0), if(y.length > 0) y.slice(1, y.length) else null)) 
} 
result.map((t: (x, y)) => y.distinct)//we want distinct lists in output 

result.unpersist 
input.unpersist 
0

所以这可能是次优的,但我认为这将是值得张贴不管。它假定你的输入文件足够小,可以保存到内存中(因为这是所有的香草Scala)。

我决定解决这个问题,将给定的id作为图中的邻接点,然后使用BFS列出所有连接的组件。

/* Input, can be read from file easily by splitting on ", " */ 
val lines = List(List("i1", "i2", "i5"), 
    List("i3", "i4"), 
    List("i2", "i6", "i7"), 
    List("i4", "i8"), 
    List("i9", "i3")) 

/* finds all sequential pairs */ 
val pairs = lines.flatMap(x => x.dropRight(1).zip(x.drop(1))) 

/* create an empty adjacency map: id -> (Set of adjacent vertices) */ 
val defMap = Map[String, Set[String]]().withDefaultValue(Set[String]()) 

/* populate the default map with the actual (symmetric) adjacencies */ 
val adjMap = pairs.foldLeft{defMap}(
    (acc, x) => acc + (x._1 -> (acc(x._1) + x._2)) + (x._2 -> (acc(x._2) + x._1))) 

/* BFS algo on map representation of graph */ 
def mapBFS(adjMap: Map[String, Set[String]]): List[List[String]] = 
{ 
    val v = adjMap.keys 
    var globalVisits = List[String]() 
    def BFS_r(elems: List[String], visited: List[List[String]]): List[String] = 
    { 
     val newNeighbors = elems.flatMap(adjMap(_)).filterNot(visited.flatten.contains).distinct 
     if (newNeighbors.isEmpty) 
      visited.flatten 
     else 
      BFS_r(newNeighbors, newNeighbors :: visited) 
    } 
    v.flatMap(x =>{ 
     if (globalVisits.contains(x)) 
      None 
     else 
     { 
      val vi: List[String] = BFS_r(List(x), List(List(x))) 
      globalVisits = globalVisits ++ vi 
      Some(vi) 
     } 
    }).toList 
} 
mapBFS(adjMap).foreach{println} 

这给下面的输出:

List(i7, i1, i6, i2, i5) 
List(i8, i4, i3, i9) 
+0

我在[Code Review]上发布了这个解决方案(http://codereview.stackexchange.com/questions/147446/print-connected-components-scala )如果你对如何改进有一些想法。 –

1

代码2.0+

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate; 
val df = spark.sparkContext.parallelize(
    List(
    "i1, i2, i5", 
    "i3, i4", 
    "i2, i6, i7", 
    "i4, i8") 
) 

//Group lines with tokens count (determing by the last occurence of comma) 
val rddGroupByTokensCount = df.map(row => (row.lastIndexOf(','), row.split(", "))) 
    .groupBy(_._1) 

//Now gather all the token to single place with flatMap and drop duplicates 
val rddUniqueTokens = rddGroupByTokensCount.map(_._2.flatMap(_._2).toSet) 

//print grouped unique tokens by the count in each line 
rddUniqueTokens.collect().map(println) 

输出:

Set(i5, i1, i7, i2, i6) 
Set(i3, i4, i8)