acutally我在MapReduce和Bulkload的帮助下将数据加载到Hbase中,这是我在Java中实现的。 所以基本上我创建了一个映射器并使用HFileOutputFormat2.configureIncrementalLoad
(完整代码在问题的末尾)进行约简,我使用一个映射器,它只是从文件中读取一些字节并创建一个put。写出这个使用LoadIncrementalHFiles.doBulkLoad
写入数据到Hbase。这一切都很好。但是肯定的时候,它会覆盖Hbase中的旧值。所以我正在寻找一种方法来追加数据,就像api工程中的追加函数一样。 感谢您的阅读,希望你们当中有些人有一个想法,可以帮助我:)Hbase Bulkload追加数据而不是覆盖它们
public int run(String[] args) throws Exception {
int result=0;
String outputPath = args[1];
Configuration configuration = getConf();
configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name",TABLE_NAME);
configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);
Job job = Job.getInstance(configuration);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapperClass(HBaseBulkLoadMapper.class);
FileInputFormat.addInputPaths(job, args[0]);
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
HFileOutputFormat2.setOutputPath(job,new Path((outputPath)));
job.setMapOutputValueClass(Put.class);
Connection c = ConnectionFactory.createConnection(configuration);
Table t = c.getTable(TableName.valueOf(TABLE_NAME));
RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME));
HFileOutputFormat2.configureIncrementalLoad(job,t,rl);
System.out.println("start");
job.waitForCompletion(true);
if (job.isSuccessful()) {
HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
} else {
result = -1;
}
return result;
}
public static void doBulkLoad(String pathToHFile, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
//HTable hTable = new HTable(configuration, tableName);
//loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf(tableName));
Admin admin = connection.getAdmin();
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
//path, admin, table, region locator
loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator);
System.out.println("Bulk Load Completed..");
} catch(Exception exception) {
exception.printStackTrace();
}
正如意见中的要求,我在这里添加表描述的输出,导致表被蟒蛇happybase创建API和我'不知道什么optionflags的API可以由默认设置...
{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}
HBase批量加载默认附加数据,如果您将表和列族配置为只存储一行版本,则不会擦除旧日期,除非此情况。你可以添加到帖子你是如何创建你的表? – maxteneff
嘿,我用Happybase api创建了表格,所以添加了表格描述...当我尝试我的源代码时,将相同组合的rowkey,family和列描述符放在两个不同的值中,然后从此检索列我只获得最后一个值。但是,如果第一个放入字符串Value1,第二个放入Value2,我想要有像“Value1Value2”这样的东西。 – Pils19
您是如何检查两个键后只有一个版本的行的?如果您在两次单独的批量加载过程中尝试插入两个不同的密钥,会发生什么? – maxteneff