2017-06-03 41 views
1

什么是可以传递给SparkContext.parallelize来创建RDD的元素的限制?更具体地说,如果我使用Python创建自定义类,那么需要实现哪些方法才能确保它在RDD中正常工作?我假设它需要实施__eq____hash__并且可以分拣。还有什么?有关文件的链接将不胜感激。我无法找到任何地方。什么样的对象可以是Spark RDD中的元素?

回答

1

严格地说,唯一的硬性要求是类是可序列化的(可挑选的),尽管对于生命周期仅限于单个任务(既不混洗也不收集/并行化)的对象来说不是必需的。

一致__hash____eq__仅需要如果类将(在byKey操作作为密钥)被用作混洗键,直接或间接地(例如,用于distinctcache)。

此外,类定义必须可以在每个工作节点上导入,因此模块必须已经存在于PYTHONPATHpyFiles中。如果类依赖于本地依赖关系,那么它们也必须存在于每个工作节点上。

最后,对于排序类型,必须使用标准Python语义进行定制。

总结:

  • 无特殊要求,除了被导入的:

    class Foo: 
        ... 
    
    # objects are used locally inside a single task 
    rdd.map(lambda i: Foo(i)).map(lambda foo: foo.get(i)) 
    
  • 必须是可序列化:

    # Has to be pickled to be distributed 
    sc.parallelize([Foo(1), Foo(2)]) 
    
    # Has to be pickled to be persisted 
    sc.range(10).map(lambda i: Foo(i)).cache() 
    
    # Has to be pickled to be fetched to the driver 
    sc.range(10).map(lambda i: Foo(i)).collect() # take, first, etc. 
    
  • 必须是Hashable

    # Explicitly used as a shuffle key 
    sc.range(10).map(lambda i: (Foo(i), 1)).reduceByKey(add) # *byKey 
    
    # Implicitly used as a shuffle kye 
    sc.range(10).map(lambda i: Foo(i)).distinct # subtract, etc. 
    

此外,所有通过闭包传递的变量都必须是可序列化的。

+0

非常好,谢谢! –