2015-12-15 51 views
0

我找了很多关于从HBase的火花加载数据的例子中,一个为我工作是星火批量删除记录在HBase的

Configuration config = HBaseConfiguration.create(); 
    config.set(TableInputFormat.INPUT_TABLE, props.getProperty(ConfigConstants.HBASE_SRC_TABLE_NAME)); 
    config.set(TableInputFormat.SCAN_MAXVERSIONS, props.getProperty(ConfigConstants.HBASE_SRC_TABLE_VERSIONS)); 
    config.set(TableInputFormat.SCAN_COLUMN_FAMILY, HbaseConstants.MAPPING_FAMILY); 
    config.set(TableInputFormat.SCAN_TIMERANGE_START, "0"); 
    config.set(TableInputFormat.SCAN_TIMERANGE_END, startTimestamp + "000"); 

    RDD<Tuple2<ImmutableBytesWritable, Result>> tupleRDD = context.newAPIHadoopRDD(config, TableInputFormat.class, 
      ImmutableBytesWritable.class, Result.class); 

但是我真的需要一种方法来删除的记录被装入火花一旦他们被处理。

试图将tupleRDD映射到JavaPairRDD<ImmutableBytesWritable, Delete>,然后用

JobConf jobConf = new JobConf(config); 
    jobConf.setOutputFormat(org.apache.hadoop.hbase.mapred.TableOutputFormat.class); 
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, props.getProperty(ConfigConstants.HBASE_TARGET_TABLE_NAME)); 
    outputPairsRDD.saveAsHadoopDataset(jobConf); 

但是这给了我像下面

"main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set. 

有没有办法做到从火花删除一个例外?

回答

0

事实证明,没有简单的方法来做到这一点,我的最终解决方案是通过hbase删除功能获取数据并删除它们,而不是使用hbase的spark-version删除。