2016-12-02 68 views
1

我有一个恼人的问题使用jupyter笔记本与火花。如何用PySpark和Jupyter分配类

我需要定义内的笔记本电脑的自定义类,并用它来执行一些地图操作

from pyspark import SparkContext 
from pyspark import SparkConf 
from pyspark import SQLContext 

conf = SparkConf().setMaster("spark://192.168.10.11:7077")\ 
       .setAppName("app_jupyter/")\ 
       .set("spark.cores.max", "10") 

sc = SparkContext(conf=conf) 

data = [1, 2, 3, 4, 5] 
distData = sc.parallelize(data) 

class demo(object): 
    def __init__(self, value): 
     self.test = value + 10 
     pass 

distData.map(lambda x : demo(x)).collect() 

它提供了以下错误:

PicklingError: Can't pickle : attribute lookup main.demo failed

我知道这个错误是关于,但我找不出解决办法..

我试过了:

  1. 在笔记本外定义一个demo.py python文件。它的工作原理,但它是这样一个丑陋的解决方案...
  2. 创建一个动态模块like this,然后再导入之后......这给了同样的错误

会有什么解决办法?...我想一切都在同类笔记本

它是可以改变的东西工作:

  1. 方式火花的作品,也许有些泡菜配置
  2. 东西代码...使用一些静态魔术方法

回答

1

这里没有可靠和优雅的解决方法,此行为与Spark没有特别的关系。 This is all about fundamental design of the Python pickle

pickle can save and restore class instances transparently, however the class definition must be importable and live in the same module as when the object was stored.

理论上你可以定义一个custom cell magic这将:

  • 写细胞对模块的内容。
  • 导入它。
  • 拨打SparkContext.addPyFile来分配模块。
from IPython.core.magic import register_cell_magic 
import importlib 

@register_cell_magic 
def spark_class(line, cell): 
    module = line.strip() 
    f = "{0}.py".format(module) 

    with open(f, "w") as fw: 
     fw.write(cell) 

    globals()[module] = importlib.import_module(module) 
    sc.addPyFile(f) 
In [2]: %%spark_class foo 
    ...: class Foo(object): 
    ...:  def __init__(self, x): 
    ...:   self.x = x 
    ...:  def __repr__(self): 
    ...:   return "Foo({0})".format(self.x) 
    ...: 

In [3]: sc.parallelize([1, 2, 3]).map(lambda x: foo.Foo(x)).collect() 
Out[3]: [Foo(1), Foo(2), Foo(3)]  

,但它是一次性交易。一旦文件被标记为分发,它就不能被更改或重新分发。此外,在远程主机上重新导入导入存在问题。我可以考虑一些更复杂的计划,但这只是比它的价值更麻烦。