2016-11-26 190 views
0

您好我有一个mapreduce应用程序可以将数据批量加载到HBase中。 我总共有142个文本文件的总大小为200GB。 我的mapper在5分钟内完成并且所有的reducer也都是100%,但最后一个还是卡住了。 它需要很长时间,并从过去24小时运行。 我有一个专栏家庭。 我的行键如下所示。Last Reducer从最近24小时运行200 GB数据集

48433197315 | 1972-03-31T00:00:00Z | 4 48433197315 | 1972-03-31T00:00:00Z | 38 48433197315 | 1972-03-31T00:00:00Z | 41 48433197315 | 1972-03-31T00 :00:00Z | 23 48433197315 | 1972-03-31T00:00:00Z | 7 48433336118 | 1972-03-31T00:00:00Z | 17 48433197319 | 1972-03-31T00:00:00Z | 64 48433197319 | 1972-03 -31T00:00:00Z | 58 48433197319 | 1972-03-31T00:00:00Z | 61 48433197319 | 1972-03-31T00:00:00Z | 73 48433197319 | 1972-03-31T00:00:00Z | 97 48433336119 | 1972 -03-31T00:00:00Z | 7

我已经创建了这样的表格。

private static Configuration getHbaseConfiguration() { 
    try { 
     if (hbaseConf == null) { 
     System.out.println(
      "UserId= " + USERID + " \t keytab file =" + KEYTAB_FILE + " \t conf =" + KRB5_CONF_FILE); 
     HBaseConfiguration.create(); 
     hbaseConf = HBaseConfiguration.create(); 
     hbaseConf.set("mapreduce.job.queuename", "root.fricadev"); 
     hbaseConf.set("mapreduce.child.java.opts", "-Xmx6553m"); 
     hbaseConf.set("mapreduce.map.memory.mb", "8192"); 
     hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024); 
     System.setProperty("java.security.krb5.conf", KRB5_CONF_FILE); 
     UserGroupInformation.loginUserFromKeytab(USERID, KEYTAB_FILE); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return hbaseConf; 
    } 

    /** 
    * HBase bulk import example Data preparation MapReduce job driver 
    * 
    * args[0]: HDFS input path args[1]: HDFS output path 
    * 
    * @throws Exception 
    * 
    */ 
    public static void main(String[] args) throws Exception { 

    if (hbaseConf == null) 
     hbaseConf = getHbaseConfiguration(); 
    String outputPath = args[2]; 
    hbaseConf.set("data.seperator", DATA_SEPERATOR); 
    hbaseConf.set("hbase.table.name", args[0]); 
    hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024); 

    Job job = new Job(hbaseConf); 
    job.setJarByClass(HBaseBulkLoadDriver.class); 
    job.setJobName("Bulk Loading HBase Table::" + args[0]); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapperClass(HBaseBulkLoadMapperUnzipped.class); 

    // job.getConfiguration().set("mapreduce.job.acl-view-job", 
    // "bigdata-app-fricadev-sdw-u6034690"); 
    if (HbaseBulkLoadMapperConstants.FUNDAMENTAL_ANALYTIC.equals(args[0])) { 
     HTableDescriptor descriptor = new HTableDescriptor(Bytes.toBytes(args[0])); 
     descriptor.addFamily(new HColumnDescriptor(COLUMN_FAMILY)); 
     HBaseAdmin admin = new HBaseAdmin(hbaseConf); 
     byte[] startKey = new byte[16]; 
     Arrays.fill(startKey, (byte) 0); 
     byte[] endKey = new byte[16]; 
     Arrays.fill(endKey, (byte) 255); 
     admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT); 
     admin.close(); 
     // HColumnDescriptor hcd = new 
     // HColumnDescriptor(COLUMN_FAMILY).setMaxVersions(1); 
     // createPreSplitLoadTestTable(hbaseConf, descriptor, hcd); 
    } 

    job.getConfiguration().setBoolean("mapreduce.compress.map.output", true); 
    job.getConfiguration().setBoolean("mapreduce.map.output.compress", true); 
    job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true); 

    job.getConfiguration().setClass("mapreduce.map.output.compression.codec", 
     org.apache.hadoop.io.compress.GzipCodec.class, org.apache.hadoop.io.compress.CompressionCodec.class); 
    job.getConfiguration().set("hfile.compression", Compression.Algorithm.LZO.getName()); 

    // Connection connection = 
    // ConnectionFactory.createConnection(hbaseConf); 
    // Table table = connection.getTable(TableName.valueOf(args[0])); 
    FileInputFormat.setInputPaths(job, args[1]); 
    FileOutputFormat.setOutputPath(job, new Path(outputPath)); 

    job.setMapOutputValueClass(Put.class); 
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(hbaseConf, args[0])); 

    System.exit(job.waitForCompletion(true) ? 0 : -1); 

    System.out.println("job is successfull.........."); 

    // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf); 

    // loader.doBulkLoad(new Path(outputPath), (HTable) table); 

    HBaseBulkLoad.doBulkLoad(outputPath, args[0]); 

    } 

    /** 
    * Enum of counters. 
    * It used for collect statistics 
    */ 
    public static enum Counters { 
     /** 
     * Counts data format errors. 
     */ 
     WRONG_DATA_FORMAT_COUNTER 
} 
} 

在我的代码中只有mapper没有reducer。 我的映射器代码是这样的。

public class FundamentalAnalyticLoader implements TableLoader { 

    private ImmutableBytesWritable hbaseTableName; 
    private Text value; 
    private Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context; 
    private String strFileLocationAndDate; 

    @SuppressWarnings("unchecked") 
    public FundamentalAnalyticLoader(ImmutableBytesWritable hbaseTableName, Text value, Context context, 
     String strFileLocationAndDate) { 

    //System.out.println("Constructing Fundalmental Analytic Load"); 

    this.hbaseTableName = hbaseTableName; 
    this.value = value; 
    this.context = context; 
    this.strFileLocationAndDate = strFileLocationAndDate; 
    } 

    @SuppressWarnings("deprecation") 
    public void load() { 
    if (!HbaseBulkLoadMapperConstants.FF_ACTION.contains(value.toString())) { 

     String[] values = value.toString().split(HbaseBulkLoadMapperConstants.DATA_SEPERATOR); 
     String[] strArrFileLocationAndDate = strFileLocationAndDate 
      .split(HbaseBulkLoadMapperConstants.FIELD_SEPERATOR); 

     if (17 == values.length) { 
     String strKey = values[5].trim() + "|" + values[0].trim() + "|" + values[3].trim() + "|" 
      + values[4].trim() + "|" + values[14].trim() + "|" + strArrFileLocationAndDate[0].trim() + "|" 
      + strArrFileLocationAndDate[2].trim(); 

     //String strRowKey=StringUtils.leftPad(Integer.toString(Math.abs(strKey.hashCode() % 470)), 3, "0") + "|" + strKey; 
     byte[] hashedRowKey = HbaseBulkImportUtil.getHash(strKey); 
     Put put = new Put((hashedRowKey)); 


     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID), 
      Bytes.toBytes(values[0].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE_ID), 
      Bytes.toBytes(values[1].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE), 
      Bytes.toBytes(values[2])); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_END_DATE), 
      Bytes.toBytes(values[3].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE), 
      Bytes.toBytes(values[4].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.LINE_ITEM_ID), Bytes.toBytes(values[5].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_ITEM_INSTANCE_KEY), 
      Bytes.toBytes(values[6].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE), Bytes.toBytes(values[7].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_CODE), 
      Bytes.toBytes(values[8].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE_CURRENCY_ID), 
      Bytes.toBytes(values[9].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_IS_ESTIMATED), 
      Bytes.toBytes(values[10].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_AUDITABILITY_EQUATION), 
      Bytes.toBytes(values[11].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE_ID), 
      Bytes.toBytes(values[12].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_ID), 
      Bytes.toBytes(values[13].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_LINE_ITEM_IS_YEAR_TO_DATE), 
      Bytes.toBytes(values[14].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.IS_ANNUAL), Bytes.toBytes(values[15].trim())); 

     // put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
     // Bytes.toBytes(HbaseBulkLoadMapperConstants.TAXONOMY_ID), 
     // Bytes.toBytes(values[16].trim())); 
     // 
     // put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
     // Bytes.toBytes(HbaseBulkLoadMapperConstants.INSTRUMENT_ID), 
     // Bytes.toBytes(values[17].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FF_ACTION), 
      Bytes.toBytes(values[16].substring(0, values[16].length() - 3))); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION), 
      Bytes.toBytes(strArrFileLocationAndDate[0].trim())); 

     put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY), 
      Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION_DATE), 
      Bytes.toBytes(strArrFileLocationAndDate[2].trim())); 

     try { 
      context.write(hbaseTableName, put); 
     } catch (IOException e) { 
      context.getCounter(Counters.WRONG_DATA_FORMAT_COUNTER).increment(1); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     } else { 

     System.out.println("Values length is less 15 and value is " + value.toString()); 
     } 

    } 
    } 

任何有助于提高速度的高度赞赏。 enter image description hereenter image description hereenter image description here专柜形象 here`

回答

2

我怀疑所有记录都进入单个区域。 当您创建空表时,HBase会将分隔的密钥地址空间分配在偶数范围内。但是因为所有实际的密钥共享相同的前缀,它们都会进入单个区域。这意味着单个区域/减少任务可以完成所有工作,而其他所有区域/减少任务都不会起任何作用。您可以通过查看Hadoop计数器来检查这个假设:与其他reduce任务相比,有多少个字节可以减慢任务读取/写入的速度。

如果这是问题,那么您需要手动准备拆分键并使用createTable(HTableDescriptor desc, byte[][] splitKeys创建表。拆分键应该均匀分配您的实际数据集以获得最佳性能。

示例#1。如果你的键是普通的英语单词,那么很容易通过第一个字符将表分成26个区域(分割键是'a','b',...,'z')。或者通过前两个字符('aa','ab',...,'zz')将它分割成26 * 26个区域。地区不一定是平均的,但是这比单一地区更好。

示例#2。如果你的密钥是4字节散列,那么很容易通过第一个字节(0x00,0x01,...,0xff)将表分成256个区域,或者通过前两个字节将表分成2^16个区域。

你的具体情况,我看到两个选项:

  1. 搜索最小键(有序)和数据集中的最大关键。并将它们用作startKeyendKeyAdmin.createTable()。只有在密钥均匀分布在startKeyendKey之间时,这才能正常工作。

  2. 用例子#2中的哈希(键)和使用方法前缀你的密钥。这应该可以很好地工作,但是您将无法进行类似(KEY> = $ {first}和KEY < = $ {last})的语义查询。

+0

我无法决定如何在createTable()中提供splitKeys。请给出一个例子吗? – SUDARSHAN

+0

@SUDARSHAN查看编辑我的答案。 – gudok

+0

感谢您的努力。我只是改变了,因为你已经逐行解释过了。但仍然发生同样的事情。我刚刚在3小时后杀死了我的代码。我的代码正在使用较小的数据集。我尝试过使用16 gb的数据集。我是gona接下来尝试50GB。但是对于完整的数据集来说,它是一样的。 – SUDARSHAN

0

晴如果作业被挂在最后一分钟或秒,那么问题可能是一个特别是具有并发问题等

小检查表可能是节点或资源: 1.用较小的数据集再试一次。这将排除代码的基本功能。 2.由于大部分工作都已完成,所以mapper和reducer可能会很好。您可以尝试几次使用相同体积的作业。日志可以帮助您确定相同的节点是否存在重复运行的问题。 3.验证输出是否按预期生成。 4.您还可以减少您尝试添加到HBase的列数。这将减轻相同体积的负载。

由于各种问题,可能会导致挂起的工作。但是排除故障主要包括以上几个步骤 - 验证原因,如果它的数据相关,资源相关,特定节点相关,内存相关等。

+0

我的代码正在与小datset。我又恢复了工作很多次,但以非常缓慢的速度在一小时内。我左边的1%运行26小时,但数据仍在进行中没有插入HBase的。不知道是什么去做 。 – SUDARSHAN