2017-10-14 64 views
0

我试图在第一列作为关键字执行两个RDD之间的连接。该RDDS样子:spark reducebykey并忽略其余

RDD1: 
(k1,(s11,s12,s13)) 
(k2,(s21,s22,s23)) 
(k3,(s31,s32,s33)) 
... 

RDD2: 
(k1,(t11,t12,t13)) 
(k2,(t21,t22,t23)) 
(k4,(t41,t42,t43)) 
... 

文从一个RDD可能或不可能找到另外一个匹配。但是,如果确实找到了匹配项,它将只与其他RDD的一行相匹配。换句话说,ki是两个RDD的主键。

我被

RDD3=RDD1.union(RDD2).reduceByKey(lambda x,y:(x+y)).filter(lambda x:len(x[1])==6) 

这样做的结果RDD会是什么样子:

RDD3: 
(k1,(s11,s12,s13,t11,t12,t13)) 
(k2,(s21,s22,s23,t21,t22,t23)) 
... 

我想避免使用filter功能,而计算RDD3。它看起来像一个可避免的计算。使用内置火花功能可以做到这一点吗?我不想用火花SQL或dataframes

回答

1

您需要join方法后跟一个mapValues方法从相同的密钥串连值:

rdd1.join(rdd2).mapValues(lambda x: x[0] + x[1]).collect() 
# [('k2', ('s21', 's22', 's23', 't21', 't22', 't23')), 
# ('k1', ('s11', 's12', 's13', 't11', 't12', 't13'))]