2016-08-05 52 views
0

我编写了一个map函数来使用itertools.groupby来聚合数据,我的操作如下所示。在pyspark中使用itertools.groupby,但失败

驱动程序代码

pair_count = df.mapPartitions(lambda iterable: pair_func_cnt(iterable)) 
pair_count.collection() 

地图功能

def pair_func_cnt(iterable): 
    from itertools import groupby 

    ls = [[1,2,3],[1,2,5],[1,3,5],[2,4,6]] 
    grp1 = [(k,g) for k,g in groupby(ls, lambda e: e[0])] 
    grp2 = [(k,g) for k,g in groupby(grp1, lambda e: e[1])] 
    return iter(grp2) 

但它提供了以下错误

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream 
    bytes = self.serializer.dumps(vs) 
    File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py", line 415, in dumps 
    return pickle.dumps(obj, protocol) 
PicklingError: Can't pickle <type 'itertools._grouper'>: attribute lookup itertools._grouper failed 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

回答

1

的Python pickle不能序列化不可否认的功能。让我们说明了一个简单的例子:

import pickle 

xs = [[1, 2, 3], [1, 2, 5], [1, 3, 5], [2, 4, 6]] 
pickle.dumps([x for x in groupby(xs, lambda x: x[0])]) 

## PicklingError 
## ... 
## PicklingError: Can't pickle ... 

你应该摆脱对lambdas所有引用的序列化之前:

pickle.dumps([(k, list(v)) for (k, v) in groupby(xs, itemgetter(0))]) 

## b'\x80\x ... 

或不使用lambda表达式:

from operator import itemgetter 

pickle.dumps([kv for kv in groupby(xs, itemgetter(0))]) 

## b'\x80\x ... 
相关问题