2016-11-10 64 views
0

有一些过滤器的功能是这样的:如何在不同的pyspark脚本中共享相同的广播变量包含的过滤器函数?

def filter1(x): 
    if broadcast_variable1.value[x] > 1: 
     return False 
    return True 

def filter2(x): 
    if broadcast_variable2.value[x] < 1: 
     error_accumulator_variable.add(1) 
     return False 
    return True 

这些功能都包含在我的主要pyspark脚本。现在我想将它们分成一个模块文件以便于维护(我有两个用于不同用途的pyspark脚本,但它们具有相同的过滤器),然后将其余文件保存在不同的文件中。

如何在不同的pyspark脚本中共享这些类似的过滤功能?

感谢您的慷慨帮助

回答

0

用它作为论据功能:

def filter1(x, bv): 
    ... 

def filter2(x, bv): 
    ... 

broadcast_variable1 = ... 

someRDD.filter(lambda x: filter1(x, broadcast_variable1)) 
1

我的基本目的是为了使程序更易读。所以我找到了一个简单的方法来实现这一点。

def x1(x,broadcast_variable1): 
    if broadcast_variable1.value[x] > 1: 
    return False 

def filter1(x): 
    return x1(x,broadcast_variable1) 

,这也可以在钻营的方式(下面的代码是不是在Python)rewriten:

def x1(broadcast_variable)(x) = 
    if broadcast_variable.value[x] > 1: 
    return False 
def filter1(x) = x1(broadcast_variable1) 
现在

,我可以写火花像这样的代码:

RDD.filter(filter1).map(map1)....