2012-07-07 150 views
33

的数量现在我实现行数超过ResultScanner这样HBase的快速计算行

for (Result rs = scanner.next(); rs != null; rs = scanner.next()) { 
    number++; 
} 

如果数据达到了数以百万计的时间计算是large.I要计算在,我不希望使用实时Mapreduce

如何快速计算行数。

回答

7

是已经被包含HBase的

+0

我用的行数示例的源代码,并保存结果在一个变量中,我得到了计数器:'job.getCounters()。findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue();' – Paschalis 2013-02-18 00:59:14

4

简单,有效的方式在HBASE数行:

  1. 当你插入一行触发这个API,这将增加该特定细胞。

    Htable.incrementColumnValue(Bytes.toBytes("count"), Bytes.toBytes("details"), Bytes.toBytes("count"), 1); 
    
  2. 检查该表中存在的行数。只需使用“获取”或“扫描”API即可查看该特定行的“数量”。

通过使用此方法,您可以在少于一毫秒内获得行计数。

+1

这是一个好方法。但时间hbase使用增量是hbase放数据的较大时间。 – cldo 2012-11-27 07:35:03

+1

如果该行已存在并更新了该怎么办?这可以计算额外的行,对吗? – Paschalis 2013-02-18 00:43:05

+0

没有。我想告诉'时间hbase使用增量是更大的时间'。我想跑得更快 – cldo 2013-03-15 17:58:03

1

如果您正在使用扫描仪,请在扫描仪中尝试尽可能返回最少量的限定符。事实上,你所返回的限定符应该是最小的(以字节大小),因为你有可用的。这将极大地加快您的扫描速度。

不幸的是,这只能扩展到目前为止(数百亿?)。要进一步研究,您可以实时执行此操作,但您首先需要运行mapreduce作业来统计所有行。

将Mapreduce输出存储在HBase的单元中。每次添加一行时,都将计数器增加1.每次删除一行时,都会减少计数器。

当您需要实时访问行数时,您可以在HBase中读取该字段。

没有快速的方式来计算行,否则以一种可缩放的方式。你只能计算得这么快。

+2

事实上,您可以使用'FirstKeyOnlyFilter'来代替“返回尽可能少的限定符”。作为扫描过滤器 – 2013-07-21 11:46:12

+0

@KennyCason“FirstKeyOnlyFilter”究竟做了什么?从[thrift docs](我无法理解这个解释:'[FirstKeyOnlyFilter]只返回每行的第一个键值 - 这是否意味着它只是选取第一个单元格并返回该单元格? – 2017-02-19 20:41:23

+0

@KennyCason在测试结束后好了,它会选择第一个单元格,并返回唯一的单元格。为什么你会建议通过@Tucker的建议返回最小限定符?例如,如果FirstKeyOnlyFilter选择的第一个键值具有一个非常大的值,那么这会减慢扫描的速度;另一方面,如果选择具有最小值的限定符,但该限定符不会出现在所有要计数的行中,那么您将获得一个不准确的计数 – 2017-02-19 20:54:37

75

在HBase中使用RowCounter RowCounter是一个mapreduce作业来统计表的所有行。这是一个很好的实用工具,可以用作健全性检查,以确保HBase可以读取表中所有块,如果有任何元数据不一致的担忧。它将在单个进程中运行mapreduce,但如果您有MapReduce群集供其利用,它将运行得更快。

$ hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename> 

Usage: RowCounter [options] 
    <tablename> [   
     --starttime=[start] 
     --endtime=[end] 
     [--range=[startKey],[endKey]] 
     [<column1> <column2>...] 
    ] 
+1

@cldo,你应该接受这个答案 – WattsInABox 2016-01-28 17:18:45

+1

K,跑这个,在哪里打印答案? 'org.apache.hadoop.hbase.mapreduce.RowCounter $ RowCounterMapper $ Counters ROWS = 55438' < - 那它? – samthebest 2016-02-15 15:37:16

22

您可以使用hbase中的count方法来计算行数。但是,是的,计算一个大表的行可以是slow.count'tablename'[interval]

返回值是行数。

此操作可能需要很长时间(运行'$ HADOOP_HOME/bin/hadoop jar hbase.jar rowcount'来运行计数mapreduce作业)。默认情况下,当前计数每1000行显示为 。计数间隔可以选择指定。扫描 默认情况下,计数扫描启用缓存。默认缓存大小为10行。 如果行的规模都很小,你可能想提高这个 参数。

实例:

hbase> count 't1' 

hbase> count 't1', INTERVAL => 100000 

hbase> count 't1', CACHE => 1000 

hbase> count 't1', INTERVAL => 10, CACHE => 1000 

相同的命令也可以对表的参考运行。假设你有到餐桌“T1”的引用,相应的命令是:

hbase> t.count 

hbase> t.count INTERVAL => 100000 

hbase> t.count CACHE => 1000 

hbase> t.count INTERVAL => 10, CACHE => 1000 
+7

该计数器运行速度很慢,只能从hbase shell访问。对于大表不推荐使用。 – articuno 2015-06-02 13:44:55

+0

@articuno确切地 – 2017-02-06 08:59:22

-1

你可以试试HBase的API方法!

org.apache.hadoop.hbase.client.coprocessor.AggregationClient

+0

您能否为您的答案提供更多的上下文以及一些相关文档的链接? – Suever 2016-02-13 03:11:26

1

U可以在这里找到样本例如:

/** 
    * Used to get the number of rows of the table 
    * @param tableName 
    * @param familyNames 
    * @return the number of rows 
    * @throws IOException 
    */ 
    public long countRows(String tableName, String... familyNames) throws IOException { 
     long rowCount = 0; 
     Configuration configuration = connection.getConfiguration(); 
     // Increase RPC timeout, in case of a slow computation 
     configuration.setLong("hbase.rpc.timeout", 600000); 
     // Default is 1, set to a higher value for faster scanner.next(..) 
     configuration.setLong("hbase.client.scanner.caching", 1000); 

     AggregationClient aggregationClient = new AggregationClient(configuration); 
     try { 
      Scan scan = new Scan(); 
      if (familyNames != null && familyNames.length > 0) { 
       for (String familyName : familyNames) { 
        scan.addFamily(Bytes.toBytes(familyName)); 
       } 
      } 
      rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan); 
     } catch (Throwable e) { 
      throw new IOException(e); 
     } 
     return rowCount; 
    } 
+0

有没有办法证明'configuration.setLong(“hbase.client.scanner。缓存“,1000);'工作?例如,如果我设置它,并且稍后调用'scanner.getCaching()',它将返回'-1'。 – 2017-02-20 03:28:27

5

如果你不能不管出于什么原因,然后结合使用RowCounter这两个过滤器应该是获得计数的最佳方式:

FirstKeyOnlyFilter() AND KeyOnlyFilter() 

FirstKeyOnlyFilter将导致仅扫描器返回第一柱限定符它发现,相对于扫描仪返回所有在表中的列限定符,这将最小化网络带宽的。简单地选择一个列限定符返回?这将工作,如果你能机制保障该列资格赛存在的每一行,但如果这是不正确的,那么你会得到一个不准确的计数。

KeyOnlyFilter将导致扫描仪只能返回列族,并且不会对列预选赛返回任何值。这进一步降低了网络带宽,在一般情况下,这并不会大幅减少,但是可能会出现边缘情况,其中前一个过滤器挑选的第一列恰好是一个非常大的值。

我试着玩scan.setCaching,但结果全是这个地方。也许它可以帮助。

我在开始之间有1600万行,停止我做了以下伪实证检验:

 
With FirstKeyOnlyFilter and KeyOnlyFilter activated: 

    With caching not set (i.e., the default value), it took 188 seconds. 
    With caching set to 1, it took 188 seconds 
    With caching set to 10, it took 200 seconds 
    With caching set to 100, it took 187 seconds 
    With caching set to 1000, it took 183 seconds. 
    With caching set to 10000, it took 199 seconds. 
    With caching set to 100000, it took 199 seconds. 

With FirstKeyOnlyFilter and KeyOnlyFilter disabled: 

    With caching not set, (i.e., the default value), it took 309 seconds 

我也懒得做这个适当的测试,但它似乎很清楚的是, FirstKeyOnlyFilterKeyOnlyFilter都不错。

此外,这个特定表格中的单元格非常小 - 所以我认为过滤器在不同的表格上会更好。


这里是一个Java代码示例:

 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.util.Bytes; 

import org.apache.hadoop.hbase.filter.RowFilter; 
import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 
import org.apache.hadoop.hbase.filter.FilterList; 

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
import org.apache.hadoop.hbase.filter.RegexStringComparator; 

public class HBaseCount { 
    public static void main(String[] args) throws IOException { 
     Configuration config = HBaseConfiguration.create(); 

     HTable table = new HTable(config, "my_table"); 

     Scan scan = new Scan(
      Bytes.toBytes("foo"), Bytes.toBytes("foo~") 
     ); 

     if (args.length == 1) { 
      scan.setCaching(Integer.valueOf(args[0])); 
     } 
     System.out.println("scan's caching is " + scan.getCaching()); 

     FilterList allFilters = new FilterList(); 
     allFilters.addFilter(new FirstKeyOnlyFilter()); 
     allFilters.addFilter(new KeyOnlyFilter()); 

     scan.setFilter(allFilters); 

     ResultScanner scanner = table.getScanner(scan); 

     int count = 0; 

     long start = System.currentTimeMillis(); 

     try { 
      for (Result rr = scanner.next(); rr != null; rr = scanner.next()) { 
       count += 1; 
       if (count % 100000 == 0) System.out.println(count); 
      } 
     } finally { 
      scanner.close(); 
     } 

     long end = System.currentTimeMillis(); 

     long elapsedTime = end - start; 

     System.out.println("Elapsed time was " + (elapsedTime/1000F)); 

    } 
} 


这里是一个pychbase代码示例:

 
    from pychbase import Connection 
    c = Connection() 
    t = c.table('my_table') 
    # Under the hood this applies the FirstKeyOnlyFilter and KeyOnlyFilter 
    # similar to the happybase example below 
    print t.count(row_prefix="foo") 

这里是一个Happybase代码示例:

 
    from happybase import Connection 
    c = Connection(...) 
    t = c.table('my_table') 
    count = 0 
    for _ in t.scan(filter='FirstKeyOnlyFilter() AND KeyOnlyFilter()'): 
     count += 1 

    print count 

由于@Tuckr and @KennyCason的小费。

0

转到HBase的主目录,并运行此命令,

./bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter '命名空间:表名'

这将启动的MapReduce作业,输出将显示hbase表中存在的记录数。

2

要计算一个适当的纱线集群上的HBase表的记录数,您必须将地图缩小作业队列名称,以及:

hbase org.apache.hadoop.hbase.mapreduce.RowCounter -Dmapreduce.job.queuename= < Your Q Name which you have SUBMIT access> 
< TABLE_NAME>