2017-03-03 426 views
2

acutally我在MapReduce和Bulkload的帮助下将数据加载到Hbase中,这是我在Java中实现的。 所以基本上我创建了一个映射器并使用HFileOutputFormat2.configureIncrementalLoad(完整代码在问题的末尾)进行约简,我使用一个映射器,它只是从文件中读取一些字节并创建一个put。写出这个使用LoadIncrementalHFiles.doBulkLoad写入数据到Hbase。这一切都很好。但是肯定的时候,它会覆盖Hbase中的旧值。所以我正在寻找一种方法来追加数据,就像api工程中的追加函数一样。 感谢您的阅读,希望你们当中有些人有一个想法,可以帮助我:)Hbase Bulkload追加数据而不是覆盖它们

public int run(String[] args) throws Exception { 
    int result=0; 
    String outputPath = args[1]; 
    Configuration configuration = getConf(); 
    configuration.set("data.seperator", DATA_SEPERATOR); 
    configuration.set("hbase.table.name",TABLE_NAME); 
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1); 
    configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2); 

    Job job = Job.getInstance(configuration); 
    job.setJarByClass(HBaseBulkLoadDriver.class); 
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapperClass(HBaseBulkLoadMapper.class); 

    FileInputFormat.addInputPaths(job, args[0]); 
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true); 
    HFileOutputFormat2.setOutputPath(job,new Path((outputPath))); 
    job.setMapOutputValueClass(Put.class); 
    Connection c = ConnectionFactory.createConnection(configuration); 
    Table t = c.getTable(TableName.valueOf(TABLE_NAME)); 
    RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME)); 
    HFileOutputFormat2.configureIncrementalLoad(job,t,rl); 
    System.out.println("start"); 
    job.waitForCompletion(true); 
    if (job.isSuccessful()) { 
     HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME); 
    } else { 

     result = -1; 
    } 
    return result; 
} 



public static void doBulkLoad(String pathToHFile, String tableName) { 
    try { 
     Configuration configuration = new Configuration(); 
     configuration.set("mapreduce.child.java.opts", "-Xmx1g"); 
     HBaseConfiguration.addHbaseResources(configuration); 
     LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration); 


     //HTable hTable = new HTable(configuration, tableName); 
     //loadFfiles.doBulkLoad(new Path(pathToHFile), hTable); 

     Connection connection = ConnectionFactory.createConnection(configuration); 
     Table table = connection.getTable(TableName.valueOf(tableName)); 
     Admin admin = connection.getAdmin(); 
     RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); 
     //path, admin, table, region locator 
     loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator); 


     System.out.println("Bulk Load Completed.."); 
    } catch(Exception exception) { 
     exception.printStackTrace(); 
    } 

正如意见中的要求,我在这里添加表描述的输出,导致表被蟒蛇happybase创建API和我'不知道什么optionflags的API可以由默认设置...

{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}

+1

HBase批量加载默认附加数据,如果您将表和列族配置为只存储一行版本,则不会擦除旧日期,除非此情况。你可以添加到帖子你是如何创建你的表? – maxteneff

+0

嘿,我用Happybase api创建了表格,所以添加了表格描述...当我尝试我的源代码时,将相同组合的rowkey,family和列描述符放在两个不同的值中,然后从此检索列我只获得最后一个值。但是,如果第一个放入字符串Value1,第二个放入Value2,我想要有像“Value1Value2”这样的东西。 – Pils19

+1

您是如何检查两个键后只有一个版本的行的?如果您在两次单独的批量加载过程中尝试插入两个不同的密钥,会发生什么? – maxteneff

回答

1

在HFileOutputFormat2.configureIncrementalLoad()http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html#line.408 PutSortReducer用作还原剂。

PutSortReducer.reduce()http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/PutSortReducer.html KeyValues存储在TreeSet中,仅使用比较器比较键。这就是为什么只有一个价值存活。

要保留2个值,您可以创建基于PutSortReducer的自己的reducer,您可以在其中保留2个值。并设置它:

HFileOutputFormat2.configureIncrementalLoad(job,t,rl); job.setReducerClass(MyReducer.class);

+0

是的创建自定义缩减器可能工作,所以你不会覆盖文件具有相同的密钥这种bulkload,但它不能解决其他问题,我想将文件中的数据添加到已创建新版本的数据中。 – Pils19

+1

覆盖现有版本是一种普遍的HBase行为,不是特定的bulkload。为了解决这个问题,在自定义reducer中,您可以读取HBase中的数据,并添加新的值。 –

+0

是的,我知道,这是一个普遍的行为,但我想找到一个像append函数一样读取数据的地区服务器和concat从输入和旧字节的方式... – Pils19

相关问题