2017-10-14 71 views
0

我试图简化下面的代码:如何压平星火reduceByKey列表

k=sc.parallelize(["dog", "cat", 'ant']).map(lambda x: (len(x),x)) 
k.reduceByKey(lambda acc,x: [acc,x]).collect() 

为什么下面失败

k.reduceByKey(lambda acc,x: [x] if acc == None else acc.append(x)).collect() 

回答

1

这是我不清楚你可以用groupByKey

k.groupByKey().mapValues(lambda x: list(x)).collect() 
# [(3, ['dog', 'cat', 'ant'])] 

看来您将reduceByKeyfold方法混淆,该方法接受初始值。

对于reduceByKey

它接受一个交换和关联函数作为参数:

  • 参数函数应具有相同的数据类型
  • 的返回类型的两个参数功能也必须与参数类型相同

在您的lambda函数中,acc.append(x)例如返回None。

因此,如果使用reduceByKey,你可以这样做:

k.reduceByKey(lambda a, b: (a if isinstance(a, list) else [a]) + (b if isinstance(b, list) else [b])).collect() 
# [(3, ['dog', 'cat', 'ant'])]