给定一个文件,其中填充了诸如在火花中组合/连接ID行
i1, i2, i5
i3, i4
i2, i6, i7
i4, i8
i9, i3
你如何通过链接相同的ID加入他们?因此,对于上面的示例,行1通过i2链接到行3,行2分别通过i4和i3链接到行4和5。这会给你以下(重复删除)
i1, i2, i5, i6, i7
i3, i4, i8, i9
我可以通过行循环做到这一点,但不知道你将如何去它以功能性方式?
给定一个文件,其中填充了诸如在火花中组合/连接ID行
i1, i2, i5
i3, i4
i2, i6, i7
i4, i8
i9, i3
你如何通过链接相同的ID加入他们?因此,对于上面的示例,行1通过i2链接到行3,行2分别通过i4和i3链接到行4和5。这会给你以下(重复删除)
i1, i2, i5, i6, i7
i3, i4, i8, i9
我可以通过行循环做到这一点,但不知道你将如何去它以功能性方式?
当你使用Apache星火,你可以使用内置的GraphX组件做的工作适合你。
import org.apache.spark.graphx._
def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
val data = sc.parallelize(List(
"1\t5\t3",
"3\t9\t30",
"7\t10\t12",
"10\t7\t13"
))
val prep = data.map(x => x.split("\t").map(_.toLong).toList)
val vertex = prep
.flatMap(x => x)
.map(x => x -> s"ID=$x")
val edges = prep
.map(x => cross(x, x))
.flatMap(x => x)
.map(x => new Edge(x._1, x._2, "likes"))
val graph = Graph(vertex, edges)
val linked = graph
.connectedComponents
.vertices
.map(_.swap)
.groupByKey
linked.take(10).foreach(println)
会打印出如下结果:
(1,CompactBuffer(30, 3, 9, 1, 5))
(7,CompactBuffer(7, 10, 12, 13))
十字简单地创建两个列表的交叉产品,以便我们可以创建所有顶点之间的边缘。
connectedComponents函数将遍历图并找出所有共享边的顶点并创建一个新图,其中每个顶点都是顶点Id - >“主”顶点ID的元组。
所以:
graph.connectedComponents.vertices.take(10).foreach(println)
将打印出来
(30,1)
(1,1)
(3,1)
(5,1)
(7,7)
(9,1)
(10,7)
(12,7)
(13,7)
正如你可以看到1和7已被选定为“主顶点”,并链接到所有连接的顶点在第一图。所以一个简单的交换和组合将把所有连接的ID组合在一起。
与星火工程而不是使用O(n * n)
解决方案,通过所有我们可以使用一个解决方案,它是O(n * k)
其中ķ是你的ID号行的循环。像这样:
val input = ...//I will assume your input is an RDD[List]
val idArray = Array(id1, id2, id3, id4, id5, id6, id6)//Array containing all IDs
val result = sc.parallelize(idArray, k).map(x => (x, x))
input = input.map(x => (x(0), if(x.length > 0) x.slice(1, x.length) else null))
//If you can afford to persist it would help greatly:
result.persist
input.persist
//We can make this loop smaller if k is large and your lists are small
//by setting the upper bound of the range to the length of the longest list.
//I'll leave this decision up to you.
for (i <- 0 to k){
result = result.cogroup(input)
input = input.map((t: (x, y)) => (y(0), if(y.length > 0) y.slice(1, y.length) else null))
}
result.map((t: (x, y)) => y.distinct)//we want distinct lists in output
result.unpersist
input.unpersist
所以这可能是次优的,但我认为这将是值得张贴不管。它假定你的输入文件足够小,可以保存到内存中(因为这是所有的香草Scala)。
我决定解决这个问题,将给定的id作为图中的邻接点,然后使用BFS列出所有连接的组件。
/* Input, can be read from file easily by splitting on ", " */
val lines = List(List("i1", "i2", "i5"),
List("i3", "i4"),
List("i2", "i6", "i7"),
List("i4", "i8"),
List("i9", "i3"))
/* finds all sequential pairs */
val pairs = lines.flatMap(x => x.dropRight(1).zip(x.drop(1)))
/* create an empty adjacency map: id -> (Set of adjacent vertices) */
val defMap = Map[String, Set[String]]().withDefaultValue(Set[String]())
/* populate the default map with the actual (symmetric) adjacencies */
val adjMap = pairs.foldLeft{defMap}(
(acc, x) => acc + (x._1 -> (acc(x._1) + x._2)) + (x._2 -> (acc(x._2) + x._1)))
/* BFS algo on map representation of graph */
def mapBFS(adjMap: Map[String, Set[String]]): List[List[String]] =
{
val v = adjMap.keys
var globalVisits = List[String]()
def BFS_r(elems: List[String], visited: List[List[String]]): List[String] =
{
val newNeighbors = elems.flatMap(adjMap(_)).filterNot(visited.flatten.contains).distinct
if (newNeighbors.isEmpty)
visited.flatten
else
BFS_r(newNeighbors, newNeighbors :: visited)
}
v.flatMap(x =>{
if (globalVisits.contains(x))
None
else
{
val vi: List[String] = BFS_r(List(x), List(List(x)))
globalVisits = globalVisits ++ vi
Some(vi)
}
}).toList
}
mapBFS(adjMap).foreach{println}
这给下面的输出:
List(i7, i1, i6, i2, i5)
List(i8, i4, i3, i9)
我在[Code Review]上发布了这个解决方案(http://codereview.stackexchange.com/questions/147446/print-connected-components-scala )如果你对如何改进有一些想法。 –
代码2.0+
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate;
val df = spark.sparkContext.parallelize(
List(
"i1, i2, i5",
"i3, i4",
"i2, i6, i7",
"i4, i8")
)
//Group lines with tokens count (determing by the last occurence of comma)
val rddGroupByTokensCount = df.map(row => (row.lastIndexOf(','), row.split(", ")))
.groupBy(_._1)
//Now gather all the token to single place with flatMap and drop duplicates
val rddUniqueTokens = rddGroupByTokensCount.map(_._2.flatMap(_._2).toSet)
//print grouped unique tokens by the count in each line
rddUniqueTokens.collect().map(println)
输出:
Set(i5, i1, i7, i2, i6)
Set(i3, i4, i8)
类似:HTTP://计算器。com/questions/40240409/apache-spark-rdd-substitution/40256149#40256149 – maasg