55

现在我ResultScanner像这样实现行计数

for (Result rs = scanner.next(); rs != null; rs = scanner.next()) {
    number++;
}

如果数据达到数百万次计算量很大。我想实时计算我不想使用 Mapreduce

如何快速计算行数。

4

12 回答 12

108

在 HBase 中使用 RowCounter RowCounter 是一个 mapreduce 作业,用于计算表的所有行。这是一个很好的实用程序,可用作完整性检查,以确保 HBase 在存在元数据不一致问题时可以读取表的所有块。它将在单个进程中运行所有 mapreduce,但如果您有一个 MapReduce 集群可供它利用,它将运行得更快。

$ hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename>

Usage: RowCounter [options] 
    <tablename> [          
        --starttime=[start] 
        --endtime=[end] 
        [--range=[startKey],[endKey]] 
        [<column1> <column2>...]
    ]
于 2013-02-14T10:11:31.000 回答
37

您可以使用 hbase 中的 count 方法来统计行数。但是,是的,计算大表的行数可能很慢。count 'tablename' [interval]

返回值是行数。

此操作可能需要很长时间(运行 '$HADOOP_HOME/bin/hadoop jar hbase.jar rowcount' 以运行计数 mapreduce 作业)。当前计数默认每 1000 行显示一次。可以选择指定计数间隔。默认情况下,对计数扫描启用扫描缓存。默认缓存大小为 10 行。如果您的行较小,您可能需要增加此参数。

例子:

hbase> count 't1'

hbase> count 't1', INTERVAL => 100000

hbase> count 't1', CACHE => 1000

hbase> count 't1', INTERVAL => 10, CACHE => 1000

相同的命令也可以在表引用上运行。假设您引用了表 't1',相应的命令将是:

hbase> t.count

hbase> t.count INTERVAL => 100000

hbase> t.count CACHE => 1000

hbase> t.count INTERVAL => 10, CACHE => 1000
于 2014-08-06T19:16:13.237 回答
12

如果您因任何原因无法使用RowCounter,那么这两个过滤器的组合应该是获得计数的最佳方式:

FirstKeyOnlyFilter() AND KeyOnlyFilter()

FirstKeyOnlyFilter将导致扫描器仅返回它找到的第一个列限定符,而不是扫描器返回表中的所有列限定符,这将最小化网络带宽。简单地选择一个列限定符返回怎么样?如果您可以保证每一行都存在列限定符,这将起作用,但如果这不是真的,那么您将得到不准确的计数。

KeyOnlyFilter将导致扫描器仅返回列族,并且不会为列限定符返回任何值。这进一步降低了网络带宽,在一般情况下不会造成太大的减少,但可能存在一种边缘情况,即前一个过滤器选择的第一列恰好是一个非常大的值。

我试着玩弄,scan.setCaching但结果到处都是。也许它会有所帮助。

我在开始和停止之间有 1600 万行,我做了以下伪经验测试:

激活 FirstKeyOnlyFilter 和 KeyOnlyFilter :

    未设置缓存(即默认值)时,需要 188 秒。
    缓存设置为 1,耗时 188 秒
    缓存设置为 10,需要 200 秒
    缓存设置为 100,耗时 187 秒
    缓存设置为 1000,耗时 183 秒。
    缓存设置为 10000,耗时 199 秒。
    缓存设置为 100000,耗时 199 秒。

禁用 FirstKeyOnlyFilter 和 KeyOnlyFilter:

    未设置缓存(即默认值),耗时 309 秒

我没有费心对此进行适当的测试,但似乎很明显FirstKeyOnlyFilterandKeyOnlyFilter是好的。

此外,这个特定表格中的单元格非常小 - 所以我认为过滤器在不同的表格上会更好。


这是一个 Java 代码示例:

导入 java.io.IOException;

导入 org.apache.hadoop.conf.Configuration;
导入 org.apache.hadoop.hbase.HBaseConfiguration;
导入 org.apache.hadoop.hbase.client.HTable;
导入 org.apache.hadoop.hbase.client.Result;
导入 org.apache.hadoop.hbase.client.ResultScanner;
导入 org.apache.hadoop.hbase.client.Scan;
导入 org.apache.hadoop.hbase.util.Bytes;

导入 org.apache.hadoop.hbase.filter.RowFilter;
导入 org.apache.hadoop.hbase.filter.KeyOnlyFilter;
导入 org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
导入 org.apache.hadoop.hbase.filter.FilterList;

导入 org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
导入 org.apache.hadoop.hbase.filter.RegexStringComparator;

公共类 HBaseCount {
    公共静态 void main(String[] args) 抛出 IOException {
        配置配置 = HBaseConfiguration.create();

        HTable 表 = 新 HTable(config, "my_table");

        扫描扫描=新扫描(
            Bytes.toBytes("foo"), Bytes.toBytes("foo~")
        );

        如果(args.length == 1){
            scan.setCaching(Integer.valueOf(args[0]));
        }
        System.out.println("scan的缓存是" + scan.getCaching());

        FilterList allFilters = new FilterList();
        allFilters.addFilter(new FirstKeyOnlyFilter());
        allFilters.addFilter(new KeyOnlyFilter());

        scan.setFilter(allFilters);

        ResultScanner 扫描仪 = table.getScanner(scan);

        整数计数 = 0;

        长开始 = System.currentTimeMillis();

        尝试 {
            for (结果 rr =scanner.next(); rr != null; rr =scanner.next()) {
                计数 += 1;
                if (count % 100000 == 0) System.out.println(count);
            }
        } 最后 {
            扫描仪.close();
        }

        长端 = System.currentTimeMillis();

        long elapsedTime = 结束 - 开始;

        System.out.println("经过的时间是 " + (elapsedTime/1000F));

    }
}


这是一个pychbase代码示例:

    从 pychbase 导入连接
    c = 连接()
    t = c.table('my_table')
    # 在后台这应用了 FirstKeyOnlyFilter 和 KeyOnlyFilter
    # 类似于下面的happybase示例
    打印 t.count(row_prefix="foo")

这是一个 Happybase 代码示例:

    从happybase导入连接
    c = 连接(...)
    t = c.table('my_table')
    计数 = 0
    对于 _ in t.scan(filter='FirstKeyOnlyFilter() AND KeyOnlyFilter()'):
        计数 += 1

    打印计数

感谢@Tuckr 和@KennyCason的提示。

于 2017-02-19T23:48:29.580 回答
9

使用HBase 附带的 HBase 行数映射/减少作业

于 2012-07-07T13:19:22.923 回答
5

在 HBASE 中计算行数的简单、有效和高效的方法:

  1. 每当您插入一行时,都会触发此 API,它将增加该特定单元格。

    Htable.incrementColumnValue(Bytes.toBytes("count"), Bytes.toBytes("details"), Bytes.toBytes("count"), 1);
    
  2. 检查该表中存在的行数。只需对该特定行“计数”使用“获取”或“扫描”API。

通过使用此方法,您可以在不到一毫秒的时间内获得行数。

于 2012-07-16T10:53:20.307 回答
5

要计算正确 YARN 集群上的 Hbase 表记录数,您还必须设置 map reduce 作业队列名称:

hbase org.apache.hadoop.hbase.mapreduce.RowCounter -Dmapreduce.job.queuename= < Your Q Name which you have SUBMIT access>
 < TABLE_NAME>
于 2017-08-29T22:49:02.180 回答
4

您可以使用自 HBase 0.92 以来可用的协处理器。请参阅协处理器AggregateProtocol示例

于 2012-07-07T20:20:44.297 回答
3

两种方法为我工作以速度从 hbase 表中获取行数

情景#1

如果 hbase 表大小很小,则使用有效用户登录到 hbase shell 并执行

>count '<tablename>'

例子

>count 'employee'

6 row(s) in 0.1110 seconds

情景#2

如果 hbase 表大小很大,则执行内置的 RowCounter map reduce 作业:使用有效用户登录到 hadoop 机器并执行:

/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter '<tablename>'

例子:

 /$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'employee'

     ....
     ....
     ....
     Virtual memory (bytes) snapshot=22594633728
                Total committed heap usage (bytes)=5093457920
        org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters
                ROWS=6
        File Input Format Counters
                Bytes Read=0
        File Output Format Counters
                Bytes Written=0
于 2019-01-02T10:44:28.940 回答
1

如果您使用的是扫描仪,请在您的扫描仪中尝试让它返回尽可能少的限定符。实际上,您返回的限定符应该是可用的最小的(以字节为单位)。这将大大加快您的扫描速度。

不幸的是,这只会扩大到目前(数百万?)。更进一步,您可以实时执行此操作,但您首先需要运行 mapreduce 作业来计算所有行数。

将 Mapreduce 输出存储在 HBase 的一个单元中。每次添加一行,计数器加 1。每次删除一行,计数器减一。

当您需要实时访问行数时,您可以在 HBase 中读取该字段。

没有快速的方法来以可扩展的方式计算行数。你只能数这么快。

于 2012-07-17T21:40:23.977 回答
1

转到 Hbase 主目录并运行此命令,

./bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'namespace:tablename'

这将启动一个 mapreduce 作业,输出将显示 hbase 表中存在的记录数。

于 2017-07-06T09:22:00.973 回答
1

你可以在这里找到示例:

/**
     * Used to get the number of rows of the table
     * @param tableName
     * @param familyNames
     * @return the number of rows
     * @throws IOException
     */
    public long countRows(String tableName, String... familyNames) throws IOException {
        long rowCount = 0;
        Configuration configuration = connection.getConfiguration();
        // Increase RPC timeout, in case of a slow computation
        configuration.setLong("hbase.rpc.timeout", 600000);
        // Default is 1, set to a higher value for faster scanner.next(..)
        configuration.setLong("hbase.client.scanner.caching", 1000);

        AggregationClient aggregationClient = new AggregationClient(configuration);
        try {
            Scan scan = new Scan();
            if (familyNames != null && familyNames.length > 0) {
                for (String familyName : familyNames) {
                    scan.addFamily(Bytes.toBytes(familyName));
                }
            }
            rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
        } catch (Throwable e) {
            throw new IOException(e);
        }
        return rowCount;
    }
于 2016-12-28T04:46:58.127 回答
-1

你可以试试 hbase api 方法!

org.apache.hadoop.hbase.client.coprocessor.AggregationClient

于 2016-02-03T09:09:16.077 回答