我正在SparkSQL上工作。我使用JavaPairRDD从HBase获取数据,然后制作了一张地图。在地图中,我将所有的钥匙保存到一个Set中。为了强制完成这张地图,我们遵循collect()。 在此之后,我使用Set中的值来执行其他操作。Spark懒惰转换执行障碍
该程序可以在我的本地PC上完美工作。但是当我把它放到集群(2名工人)时,就存在执行障碍。在地图转换之前,执行Set操作。
代码流是这样的: 从HBase的获取数据:
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf,
TableInputFormat.class, ImmutableBytesWritable.class,
Result.class);
变换数据:
JavaRDD<Map<String, String>> data = hBaseRDD.map(
new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){
public Map<String, String> call(
Tuple2<ImmutableBytesWritable, Result> re)
throws Exception {
byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload"));
Map<String, String> map = new ConcurrentHashMap<String, String>();
String primaryKey = new String(re._1().get());
map.put("primaryKey", primaryKey);
if(payload != null)
map.put("payload", new String(payload));
Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo"));
if(tmpMetaMap != null){
for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){
String tmpKey = Bytes.toString(entry.getKey());
String tmpValue = Bytes.toString(entry.getValue());
map.put(tmpKey, tmpValue);
//save result to the set
keySet.add(tmpKey);
}
}
return map;
}
});
力上述地图来运行:
data.collect();
获取结果套装:
StringBuilder sb = new StringBuilder();
for(String fieldName: keySet){
sb.append(fieldName).append(",");
}
当我在本地运行的代码,我可以得到所有的结果。但是当我在集群上运行它时,sb没有任何价值。
另一个问题:为什么在本地运行时可以设置keySet值? – user2965590 2014-12-03 16:08:46