2015-11-05 74 views
1

我有对象来处理的数组:Objects,我有一个函数,它接受在字典中,和一个对象,并返回相同的字典,改性:如何在spark中并行处理返回字典?

new_dict = modify_object_dict(object_dict, object) 

modify_object_dict执行以下操作:

  • 增加了一个密钥给字典这是处理

  • 对象的名称的DIC内创建一个字典作为一个值给键(辞典),其中添加了元素并从中删除了元素。

  • 例如,该对象可能是一个文件:object_dict['file_name']=sub_dictionary,子字典可能包含sub_dictionary['file_attribute']=attribute

modify_object_dict填充这些子字典,如上所示,其结果是保持子字典一个字典。

请注意,子词典不会相互影响。即一个对象的字典不与另一个对象的字典交互。

我想用火花并行这些对象的处理:

object_dict = {} # dictionary is initially empty 
RDD = (sc.parallelize(Objects) 
    .map(lambda object: modify_object_dict(object_dict, object)) 

这是正确的方式做到这一点?如果不是,每次调用映射函数时,返回被修改的字典的正确方法是什么?

+1

不幸的是,你不能修改的任务内的操作上的驱动程序存在的变量遗嘱执行人。 'modify_object_dict()'做了什么?也许,你可以在任务中应用你的处理,然后利用'collectAsMap()'? –

回答

3

返回每次调用映射函数时都会修改的字典的正确方法是什么?

简短的回答是没有。由于每个分区都是单独处理的,因此无法创建具有读/写访问权限的共享对象。 Spark仅支持两种类型的共享变量,即累加器和广播,分别为只写和只读访问。

长答案取决于modify_object_dict内部到底发生了什么。如果您使用的操作是关联性和交换性的并且可以在关键的基础上执行(每个对象可以映射到特定键上的操作),则可以使用aggregateByKey的某些变体。也可以使用mapPartitions在本地分区数据和进程。

如果modify_object_dict不符合上述标准,那么Spark很可能不是一个好的选择。将状态推送到外部系统是可能的,但通常没有意义,除非Spark用于繁重的工作,并且所有推到外面的都是最终结果。

此外,你不应该使用map副作用。在这种情况下,正确的方法通常是foreach。这里还有一个更微妙的问题。无法保证每个元素只能执行一次map(或foreach)。这意味着您执行的每项操作都必须是幂等的。

编辑

根据您的描述,似乎你可以尝试以下方法:

  • 首先让我们创建RDD一个虚拟类:

    class Foobar(object): 
        def __init__(self, name, x=None, y=None, z=None): 
         self.name = name 
         self.x = x 
         self.y = y 
         self.z = z 
    

    和RDD对象:

    objects = sc.parallelize([ 
        {"name": "foo", "x": 1}, {"name": "foo", "y": 3}, 
        {"name": "bar", "z": 4} 
    ]).map(lambda x: Foobar(**x)) 
    
  • 接下来可以将其转换为PairwiseRDD,其中名称为键和对象作为值。如果对象很大,则只能提取感兴趣的字段并将它们用作值。我假设每个对象都有name属性。

    pairs = objects.map(lambda obj: (obj.name, obj)) 
    
  • 要么groupByKey和变换值:

    rdd = pairs.groupByKey().mapValues(lambda iter: ...) 
    

    aggregateByKey(推荐):

    def seq_op(obj_dict, obj): 
        # equivalent to modify_object_dict 
        # Lets assume it is as simple as this 
        obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z")) 
        return obj_dict 
    
    def comb_op(obj_dict_1, obj_dict_2): 
        # lets it is a simple union 
        obj_dict_1.update(obj_dict_2) 
        return obj_dict_1 
    
    dicts = pairs.aggregateByKey({}, seq_op, comb_op) 
    
  • 在这一刻,你有对(name, dict)的RDD。它可用于进一步加工或如果你真的需要一个本地结构所收集的地图:

    dicts.collectAsMap() 
    ## {'bar': {'x': None, 'y': None, 'z': 4}, 
    ##  'foo': {'x': None, 'y': 3, 'z': None}} 
    
+0

是由于分区复制而导致的最后一段吗? –

+1

@RohanAletty不,这只是一个韧性问题。如果执行器丢失并且集群管理器可以从该任务中恢复,则可以在另一台机器上重新安排时间。如果数据没有被缓存,那么显然它将在每次访问后代rdds时执行,等等。您唯一的保证是来自操作或转换的输出是正确的。 – zero323

+0

正确,合理。 –