2013-01-11 71 views
3

我是Hive和MapReduce的新手,非常感谢您的回答,并且提供了正确的方法。从MapReduce作业向Hive添加分区

我已经定义了一个外部表logs在hive分区日期和原始服务器上的外部位置hdfs /data/logs/。我有一个MapReduce作业,它获取这些日志文件并将它们拆分并存储在上述文件夹下。像

"/data/logs/dt=2012-10-01/server01/" 
"/data/logs/dt=2012-10-01/server02/" 
... 
... 

从MapReduce工作,我想补充分区蜂巢中的表格日志。我知道这两种方法

  1. alter table命令 - 太多的ALTER TABLE命令
  2. 添加动态分区

对于方法有两个我只看到INSERT OVERWRITE例子这是不是我的选择。有没有办法在作业结束后将这些新分区添加到表中?

回答

3

要在Map/Reduce作业中执行此操作,我建议使用Apache HCatalog,这是一个在Hadoop下加盖的新项目。

HCatalog确实是HDFS上的一个抽象层,因此您可以用标准方式编写输出,无论是Hive,Pig还是M/R。如果这是你的图片,你可以使用输出格式HCatOutputFormat从你的Map/Reduce作业直接将数据加载到Hive中。以下是取自the official website的示例。

写了一个特定的分区(A = 1,B = 1)会去像这样的电流代码示例:

Map<String, String> partitionValues = new HashMap<String, String>(); 
partitionValues.put("a", "1"); 
partitionValues.put("b", "1"); 
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues); 
HCatOutputFormat.setOutput(job, info); 

和写入到多个分区,单独的作业将不得不与上述每个开始。

您也可以在HCatalog中使用动态分区,在这种情况下,您可以在同一个作业中加载任意多个分区!

我建议您在上面提供的网站上进一步阅读HCatalog,如果需要,应该会提供更多详细信息。

+0

我使用Cloudera的分布,它不具有与捆绑HCatalog。 Oozie可以成为一种选择吗?如果是这样的话,有什么想法? – user1971133

+0

如果你不想要很多alter table语句,不想做一个插入覆盖,也不能使用HCatalog,这在我看来会变得很复杂。 Oozie只是一个工作流程调度器,你仍然需要在某个地方定义一个工作。 –

3

事实上,事情比这更复杂一点,这很不幸,因为它在官方消息中是没有文档的(截至目前),并且需要花费几天的时间才能弄清楚。

我发现,我需要做下面让HCatalog MapReduce作业与写入动态分区工作:

在我的工作(通常是减速)的我的记录写入阶段,我必须要手动将我的动态分区(HCatFieldSchema)添加到我的HCatSchema对象。

麻烦的是,HCatOutputFormat.getTableSchema(config)实际上并不返回分区字段。他们需要手动添加

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null); 
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null); 
schema.append(hfs1); 
schema.append(hfs2); 
0

下面是写与动态划分多个表中使用HCatalog一个工作的代码,代码已经在Hadoop 2.5.0,蜂巢0.13测试。1:

// ... Job setup, InputFormatClass, etc ... 
String dbName = null; 
String[] tables = {"table0", "table1"}; 

job.setOutputFormatClass(MultiOutputFormat.class); 
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); 

List<String> partitions = new ArrayList<String>(); 
partitions.add(0, "partition0"); 
partitions.add(1, "partition1"); 

HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null); 
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null); 

for (String table : tables) { 
    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class); 

    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null); 
    outputJobInfo.setDynamicPartitioningKeys(partitions); 

    HCatOutputFormat.setOutput(
     configurer.getJob(table), outputJobInfo 
    ); 

    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration()); 
    schema.append(partition0); 
    schema.append(partition1); 

    HCatOutputFormat.setSchema(
     configurer.getJob(table), 
     schema 
    ); 
} 
configurer.configure(); 

return job.waitForCompletion(true) ? 0 : 1; 

映射:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> { 
    @Override 
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     HCatRecord record = new DefaultHCatRecord(3); // Including partitions 
     record.set(0, value.toString()); 

     // partitions must be set after non-partition fields 
     record.set(1, "0"); // partition0=0 
     record.set(2, "1"); // partition1=1 

     MultiOutputFormat.write("table0", null, record, context); 
     MultiOutputFormat.write("table1", null, record, context); 
    } 
}