2017-06-20 41 views
0

我还是Spark/PySpark的新手,并且有以下问题。我买了ID's嵌套页清单:Spark/PySpark:由任何嵌套列表项组

result = [[411, 44, 61], [42, 33], [1, 100], [44, 42]] 

我正尝试实现的事情是,如果子表的任何项目在另一个子列表中的项目相匹配的双方应该合并。结果应如下所示:

merged_result = [[411, 44, 61, 42, 33, 44, 42], [1,100]] 

“result”中的第一个列表与第四个列表匹配。第四个列表与第二个列表匹配,因此所有3个列表应合并成一个列表。第三个列表与其他列表不匹配,所以它保持不变。

我可以通过用Python编写循环来实现这一点。

result_after_matching = [] 
for i in result: 
    new_list = i 
    for s in result: 
     if any(x in i for x in s): 
      new_list = new_list + s 
    result_after_matching.append(set(new_list)) 

#merged_result = [[411, 44, 61, 42], [42,33,44], [1, 100], [44,42,33,411,61]] 

由于这不是期望的输出,我需要重复循环,再做另一组()公开了“merged_result”)

set([[411,44,61,42,33], [42,33,44,411,61],[1,100], [44,42,33,411,61]]) 
-> [[411, 44, 61, 42, 33], [1,100]] 

随着名单的名单,和子列表获取随着新数据的传入,时间越来越大,这不会是使用的功能。

任何人都可以告诉我,如果有一个函数,在火花/ Pyspark,匹配/合并/ groupby /减少这些嵌套列表更容易和更快?

非常感谢! MG

回答

2

大多数基于rdd或基于数据帧的解决方案可能效率很低。这是因为您的问题的性质要求您的数据集的每个元素都可能多次与其他元素进行比较。这使得在整个集群中分配工作效率最低。

也许一个不同的方式来做到这一点将是重新这个图形问题。如果将列表中的每个项目视为图上的节点,并将每个列表视为子图,则从子图构建的父图的连通组件将成为所需的结果。下面是使用networkx包在python一个例子:

import networkx as nx 

result = [[411, 44, 61], [42, 33], [1, 100], [44, 42]] 

g = nx.DiGraph() 
for subgraph in result: 
    g.add_path(subgraph) 

u = g.to_undirected() 
output=[] 
for component in nx.connected_component_subgraphs(u): 
    output.append(component.nodes()) 

print(output) 
# [[33, 42, 411, 44, 61], [1, 100]] 

这应该是相当有效的,但如果你的数据量非常大,将是有意义的使用更可扩展的图形分析工具。星火确实有所谓GraphX的图形处理库:

https://spark.apache.org/docs/latest/graphx-programming-guide.html

不幸的是,pyspark实施滞后有点落后。所以,如果你打算使用这样的东西,你可能会被卡住使用scala-spark或完全不同的框架。

+0

您的解决方案运行得非常快!即使有3k +列表。唯一没有用到真实例子的是单值表(例如[57]),其中没有包含输出,你有任何解释吗?!单值意味着此ID不在任何其他列表中。所以我只是在我使用你的代码之前将这个列表分成两部分,然后再将它们合并。 – mgruber

+0

此外,我尝试使用GraphX,但它如你所说没有在Python中工作。不知何故,我只能在我们的发行版中使用基于“Python”的脚本。我会和我们的分销架构师谈谈。 – mgruber

1

我认为你可以使用RDD的aggregate动作。下面我将示例实现放在Scala中。请注意,我使用递归来提高可读性,但为了提高性能,重新实现这些函数是个好主意。

def overlap(s1: Seq[Int], s2: Seq[Int]): Boolean = 
    s1.exists(e => s2.contains(e)) 

def mergeSeq(s1: Seq[Int], s2: Seq[Int]): Seq[Int] = 
    s1.union(s2).distinct 

def mergeSeqWithSeqSeq(s: Seq[Int], ss: Seq[Seq[Int]]): Seq[Seq[Int]] = ss match { 
    case Nil => Seq(s) 
    case h +: tail => 
     if(overlap(h, s)) mergeSeqWithSeqSeq(mergeSeq(h, s), tail) 
     else h +: mergeSeqWithSeqSeq(s, tail) 
} 

def mergeSeqSeqWithSeqSeq(s1: Seq[Seq[Int]], s2: Seq[Seq[Int]]): Seq[Seq[Int]] = s1 match { 
    case Nil => s2 
    case h +: tail => mergeSeqWithSeqSeq(h, mergeSeqSeqWithSeqSeq(tail, s2)) 
} 

val result = rdd 
    .aggregate(Seq.empty[Seq[Int]]) (
     {case (ss, s) => mergeSeqWithSeqSeq(s, ss)}, 
     {case (s1, s2) => mergeSeqSeqWithSeqSeq(s1, s2)} 
    )