2017-04-07 85 views
1

是否有可能加盟星火2个RDDS上的自定义功能? 我有两个大字符串作为关键的RDD。我希望他们不要采用经典的加入加入,但如自定义功能:加入自定义函数的两个RDDS - SPARK

def my_func(a,b): 
    return Lev.distance(a,b) < 2 

result_rdd = rdd1.join(rdd2, my_func) 

如果这是不可能的,没有任何替代方案,将继续使用火花集群的好处是什么? 我写了类似的东西,但是pyspark将无法分配我的小群集上的工作。提前(和对不起我的英语,因为我是意大利人)

def custom_join(rdd1, rdd2, my_func): 
    a = rdd1.sortByKey().collect() 
    b = rdd2.sortByKey().collect() 
    i = 0 
    j = 0 
    res = [] 
    while i < len(a) and j < len(b): 
     if my_func(a[i][0],b[j][0]): 
      res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))] 
      i+=1 
      j+=1 
     elif a[i][0] < b[j][0]: 
      i+=1 
     else: 
      j+=1 

    return sc.parallelize(res) 

感谢

回答

2

您可以使用笛卡尔,然后筛选根据条件。

from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate() 
sc = spark.sparkContext 
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2), ("b", 3)]) 

def customFunc(x): 
    # You may use any condition here 
    return x[0][0] ==x[1][0] 

print(x.join(y).collect()) # normal join 
# replicating join with cartesian 
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect()) 

输出:

[('b', (4, 3)), ('a', (1, 2))] 
[('a', (1, 2)), ('b', (4, 3))] 
+0

谢谢,但我觉得比起加入笛卡儿积将是非常低效的。我正在处理一个包含大约2M条目的数据库。 –

+0

是否可以使用数据框API? – Himaprasoon

+0

数据框是否与群集计算兼容? –