我有一个数据帧,看起来像这样:星火合并套常见的元素
+-----------+-----------+
| Package | Addresses |
+-----------+-----------+
| Package 1 | address1 |
| Package 1 | address2 |
| Package 1 | address3 |
| Package 2 | address3 |
| Package 2 | address4 |
| Package 2 | address5 |
| Package 2 | address6 |
| Package 3 | address7 |
| Package 3 | address8 |
| Package 4 | address9 |
| Package 5 | address9 |
| Package 5 | address1 |
| Package 6 | address10 |
| Package 7 | address8 |
+-----------+-----------+
我需要找到被视为一起在不同的软件包的所有地址。输出示例:
+----+------------------------------------------------------------------------+
| Id | Addresses |
+----+------------------------------------------------------------------------+
| 1 | [address1, address2, address3, address4, address5, address6, address9] |
| 2 | [address7, address8] |
| 3 | [address10] |
+----+------------------------------------------------------------------------+
所以,我有DataFrame。我被package
分组它(而不是分组):
val rdd = packages.select($"package", $"address").
map{
x => {
(x(0).toString(), x(1).toString())
}
}.rdd.combineByKey(
(source) => {
Set[String](source)
},
(acc: Set[String], v) => {
acc + v
},
(acc1: Set[String], acc2: Set[String]) => {
acc1 ++ acc2
}
)
然后,我合并具有共同地址行:
val result = rdd.treeAggregate(
Set.empty[Set[String]]
)(
(map: Set[Set[String]], row) => {
val vals = row._2
val sets = map + vals
// copy-paste from here https://stackoverflow.com/a/25623014/772249
sets.foldLeft(Set.empty[Set[String]])((cum, cur) => {
val (hasCommon, rest) = cum.partition(_ & cur nonEmpty)
rest + (cur ++ hasCommon.flatten)
})
},
(map1, map2) => {
val sets = map1 ++ map2
// copy-paste from here https://stackoverflow.com/a/25623014/772249
sets.foldLeft(Set.empty[Set[String]])((cum, cur) => {
val (hasCommon, rest) = cum.partition(_ & cur nonEmpty)
rest + (cur ++ hasCommon.flatten)
})
},
10
)
但是,无论我做什么,treeAggregate
正在很长,我不能完成单一任务。原始数据大小约为250GB。我尝试过不同的群集,但treeAggregate
花费的时间太长。
treeAggregate
之前的所有内容都很好用,但之后就会出现问题。
我试过了不同的spark.sql.shuffle.partitions
(默认值是2000,10000),但它似乎并不重要。
我试过不同depth
为treeAggregate
,但没有注意到区别。
相关问题:
我不知道我明白你想要做什么。为什么不这样做:packages.groupBy(“packages”)。agg(collect_set(“address”))? –
@AssafMendelson因为它会给我完全不同的结果比我需要。请仔细观察预期结果。如果我会分组,我会得到7个不同的结果,但我预计只有三个。 – twoface88
@AssafMendelson示例:address4和address1属于一起,即使它们属于不同的包,因为地址3已经在package1和package2中看到。因此,来自package1和package2的所有地址都属于同一个地址,依此类推。 – twoface88