2017-08-05 108 views
0

假设我想使用创建方式昂贵的对象来映射RDD。我想为每个工作者/线程创建一个对象,并且必须在处理每个工作者上的RDD分区的项目之前创建它。在Apache Spark上为每个工作人员创建一个单身人士

我的解决办法是:

final Function0<ModelEvaluator> f =() -> { 

     if (ModelEvaluator.getInstance() == null) { 
      ModelEvaluator m = new ModelEvaluator(script); 
      ModelEvaluator.setInstance(m); 
     } 

     return ModelEvaluator.getInstance(); 
    }; 

    JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
      (t) -> { 
       try { 
        double val = f.call().evaluateModel(t); 
        return new Tuple2<>(val, t); 
       } catch (Exception ex) { 
        return null; 
       } 
      } 
    ); 



public class ModelEvaluator { 

    private static ModelEvaluator instance; 

    public static void setInstance(ModelEvaluator instance) { 
    ModelEvaluator.instance = instance; 
    } 

    public static ModelEvaluator getInstance() { 
     return instance; 
    } 
... 

在这种情况下,“ModelEvaluator”对象分析的脚本,然后使用“服务”对象的列表,以便计算出相应的响应度量配置模型参数该参数配置。但我不想在每次处理RDD行时解析脚本。

我还配置了我的集群为每个集群创建一个进程,并且每个进程只会产生一个工人,因为同一进程中多个工人同时访问一个具有可变状态的单例实例时会出现问题。

有没有更适合我的问题的解决方案?

回答

1

这可以通过Broadcast变量完成。这将允许您在驱动程序上创建一个对象,并根据需要将其发送给每个工作人员一次。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script)); 

JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
     (t) -> { 
      try { 
       double val = model.value().evaluateModel(t); 
       return new Tuple2<>(val, t); 
      } catch (Exception ex) { 
       return null; 
      } 
     } 
); 
+1

非常感谢你,它像一个魅力工作。我需要制作“ModelEvaluator”类Serializable,并将某些字段配置为瞬态以避免出现问题。我需要使用一些逻辑来执行对象的延迟初始化,而不是在构造函数上初始化它。 –