2016-06-28 105 views
5

我正在尝试下面的代码,它将一个数字添加到RDD中的每一行,并使用PySpark返回一个RDD列表。PySpark评估

from pyspark.context import SparkContext 
file = "file:///home/sree/code/scrap/sample.txt" 
sc = SparkContext('local', 'TestApp') 
data = sc.textFile(file) 
splits = [data.map(lambda p : int(p) + i) for i in range(4)] 
print splits[0].collect() 
print splits[1].collect() 
print splits[2].collect() 

在输入文件(sample.txt的)的含量为:

1 
2 
3 

我(分别与0添加数字在RDD,1,2)期待像这样的输出:

[1,2,3] 
[2,3,4] 
[3,4,5] 

而实际产量为:

[4, 5, 6] 
[4, 5, 6] 
[4, 5, 6] 

这意味着理解只使用变量i的值3,而不考虑范围(4)

为什么会发生这种行为?

回答

3

它发生是因为Python后期绑定,并不是(Py)Spark特定的。 i将在使用lambda p : int(p) + i时查找,而不是在定义时查找。通常,它意味着什么时候被调用,但在这个特定的上下文中,它是在序列化时发送给工作人员的。

例如,你可以像这样做:

def f(i): 
    def _f(x): 
     try: 
      return int(x) + i 
     except: 
      pass 
    return _f 

data = sc.parallelize(["1", "2", "3"]) 
splits = [data.map(f(i)) for i in range(4)] 
[rdd.collect() for rdd in splits] 
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]] 
+0

我曾试图通过“P”,以一个简单的外部函数,以及内部函数(如一个在答案中)通过一个lambda调用,用于试验和错误目的。 注意到正确的行为,当我这样做:http://pastebin.com/z7E7wGKx 谢谢你回答为什么发生这种情况的原因。 – srjit

+0

值得注意的是,这发生在几乎任何语言的闭包/ lambdas,甚至C# –

2

这是由于这样的事实:通过lambda表达式参考参考我!它与火花无关。 See this

你可以试试这个:

a =[(lambda y: (lambda x: y + int(x)))(i) for i in range(4)] 
splits = [data.map(a[x]) for x in range(4)] 

或在一行

splits = [ 
    data.map([(lambda y: (lambda x: y + int(x)))(i) for i in range(4)][x]) 
    for x in range(4) 
] 
+1

如果你想使用'lambdas'有一个简单的技巧,避免嵌套:'[lambda x,i = i:i + int(x )我在范围内(4)]'。 – zero323