2014-12-02 50 views
1

我正在SparkSQL上工作。我使用JavaPairRDD从HBase获取数据,然后制作了一张地图。在地图中,我将所有的钥匙保存到一个Set中。为了强制完成这张地图,我们遵循collect()。 在此之后,我使用Set中的值来执行其他操作。Spark懒惰转换执行障碍

该程序可以在我的本地PC上完美工作。但是当我把它放到集群(2名工人)时,就存在执行障碍。在地图转换之前,执行Set操作。

代码流是这样的: 从HBase的获取数据:

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =  jsc.newAPIHadoopRDD(hbase_conf, 
       TableInputFormat.class, ImmutableBytesWritable.class, 
       Result.class); 

变换数据:

JavaRDD<Map<String, String>> data = hBaseRDD.map(
       new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){ 
        public Map<String, String> call(
          Tuple2<ImmutableBytesWritable, Result> re) 
          throws Exception { 
         byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload")); 
         Map<String, String> map = new ConcurrentHashMap<String, String>(); 

         String primaryKey = new String(re._1().get()); 
         map.put("primaryKey", primaryKey); 

         if(payload != null) 
          map.put("payload", new String(payload)); 

         Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo")); 
         if(tmpMetaMap != null){ 
          for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){ 

           String tmpKey = Bytes.toString(entry.getKey()); 
           String tmpValue = Bytes.toString(entry.getValue()); 

           map.put(tmpKey, tmpValue); 
    //save result to the set 
           keySet.add(tmpKey); 
          } 
         } 
         return map; 
        } 
       }); 

力上述地图来运行:

data.collect(); 

获取结果套装:

StringBuilder sb = new StringBuilder(); 

     for(String fieldName: keySet){ 

      sb.append(fieldName).append(","); 
     } 

当我在本地运行的代码,我可以得到所有的结果。但是当我在集群上运行它时,sb没有任何价值。

回答

0

这个问题是不相关的操作的顺序,而是其中集群中的这种行动正在发生。

火花,这里有两种类型的操作:变革与行动。

转换转换和RDD到另一个RDD通过应用一些功能的内容。这是一种纯粹的功能性方法,无副作用。 动作采用RDD并产生其他内容,如文件或本地数据结构:这些操作将RDD的数据实现为其他形式。

在这种情况下,转换函数:map正在使用带有副作用,因为keyset预计会在映射转换期间发生变异。 鉴于keyset在转换函数的范围之外定义,它会被序列化和发送到执行者,但任何突变发生远程将不会在驱动程序恢复。

如果我们仔细想想,遗嘱执行人将申请对数据的分区改造,使任何内容`键集”结尾,将只有每个分区的局部视图。

模型正确的方法是重新定义在RDD变革和行动方面的操作。

从上面的代码看来,我们想要将某些输入转换为RDD[Map[String,String]],并且我们有兴趣从驱动程序中收集所有不是“主键”和“有效负载”条目的键集合结果。

火花,这可能是这样的:

// data = RDD[Map[String, String]] 
// first we get all the keys from all the maps 
val keys = data.map{entry => entry.keys} 
// now we collect that information on the driver 
val allKeys = keys.collect 
// we transform the resulting array into a set - this will remove duplicates by definition 
val allKeySet = allKeys.toSet 
// We need still to remove "primaryKey" and "payload" 
val keySet = fullKeySet.diff(Set("primaryKey","payload")) 

在Java代码更详细一点,但结构和思路是一致的。

+0

另一个问题:为什么在本地运行时可以设置keySet值? – user2965590 2014-12-03 16:08:46

0

您是如何定义键集的?尝试将它定义为静态或以其它方式使用foreach代替map这将对所有的数据到这个DriverSide.Hope回答你的问题

+0

是的,我将keySet定义为最终的静态HashSet。 我也在hBaseRDD之后尝试了一个简单的foreach。只是一个习惯而没有遵循。它在火花服务器上也不起作用。 – user2965590 2014-12-03 00:37:39