现在我ResultScanner
像这样实现行计数
for (Result rs = scanner.next(); rs != null; rs = scanner.next()) {
number++;
}
如果数据达到数百万次计算量很大。我想实时计算我不想使用 Mapreduce
如何快速计算行数。
在 HBase 中使用 RowCounter RowCounter 是一个 mapreduce 作业,用于计算表的所有行。这是一个很好的实用程序,可用作完整性检查,以确保 HBase 在存在元数据不一致问题时可以读取表的所有块。它将在单个进程中运行所有 mapreduce,但如果您有一个 MapReduce 集群可供它利用,它将运行得更快。
$ hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename>
Usage: RowCounter [options]
<tablename> [
--starttime=[start]
--endtime=[end]
[--range=[startKey],[endKey]]
[<column1> <column2>...]
]
您可以使用 hbase 中的 count 方法来统计行数。但是,是的,计算大表的行数可能很慢。count 'tablename' [interval]
返回值是行数。
此操作可能需要很长时间(运行 '$HADOOP_HOME/bin/hadoop jar hbase.jar rowcount' 以运行计数 mapreduce 作业)。当前计数默认每 1000 行显示一次。可以选择指定计数间隔。默认情况下,对计数扫描启用扫描缓存。默认缓存大小为 10 行。如果您的行较小,您可能需要增加此参数。
例子:
hbase> count 't1'
hbase> count 't1', INTERVAL => 100000
hbase> count 't1', CACHE => 1000
hbase> count 't1', INTERVAL => 10, CACHE => 1000
相同的命令也可以在表引用上运行。假设您引用了表 't1',相应的命令将是:
hbase> t.count
hbase> t.count INTERVAL => 100000
hbase> t.count CACHE => 1000
hbase> t.count INTERVAL => 10, CACHE => 1000
如果您因任何原因无法使用RowCounter
,那么这两个过滤器的组合应该是获得计数的最佳方式:
FirstKeyOnlyFilter() AND KeyOnlyFilter()
这FirstKeyOnlyFilter
将导致扫描器仅返回它找到的第一个列限定符,而不是扫描器返回表中的所有列限定符,这将最小化网络带宽。简单地选择一个列限定符返回怎么样?如果您可以保证每一行都存在列限定符,这将起作用,但如果这不是真的,那么您将得到不准确的计数。
这KeyOnlyFilter
将导致扫描器仅返回列族,并且不会为列限定符返回任何值。这进一步降低了网络带宽,在一般情况下不会造成太大的减少,但可能存在一种边缘情况,即前一个过滤器选择的第一列恰好是一个非常大的值。
我试着玩弄,scan.setCaching
但结果到处都是。也许它会有所帮助。
我在开始和停止之间有 1600 万行,我做了以下伪经验测试:
激活 FirstKeyOnlyFilter 和 KeyOnlyFilter : 未设置缓存(即默认值)时,需要 188 秒。 缓存设置为 1,耗时 188 秒 缓存设置为 10,需要 200 秒 缓存设置为 100,耗时 187 秒 缓存设置为 1000,耗时 183 秒。 缓存设置为 10000,耗时 199 秒。 缓存设置为 100000,耗时 199 秒。 禁用 FirstKeyOnlyFilter 和 KeyOnlyFilter: 未设置缓存(即默认值),耗时 309 秒
我没有费心对此进行适当的测试,但似乎很明显FirstKeyOnlyFilter
andKeyOnlyFilter
是好的。
此外,这个特定表格中的单元格非常小 - 所以我认为过滤器在不同的表格上会更好。
这是一个 Java 代码示例:
导入 java.io.IOException; 导入 org.apache.hadoop.conf.Configuration; 导入 org.apache.hadoop.hbase.HBaseConfiguration; 导入 org.apache.hadoop.hbase.client.HTable; 导入 org.apache.hadoop.hbase.client.Result; 导入 org.apache.hadoop.hbase.client.ResultScanner; 导入 org.apache.hadoop.hbase.client.Scan; 导入 org.apache.hadoop.hbase.util.Bytes; 导入 org.apache.hadoop.hbase.filter.RowFilter; 导入 org.apache.hadoop.hbase.filter.KeyOnlyFilter; 导入 org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 导入 org.apache.hadoop.hbase.filter.FilterList; 导入 org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 导入 org.apache.hadoop.hbase.filter.RegexStringComparator; 公共类 HBaseCount { 公共静态 void main(String[] args) 抛出 IOException { 配置配置 = HBaseConfiguration.create(); HTable 表 = 新 HTable(config, "my_table"); 扫描扫描=新扫描( Bytes.toBytes("foo"), Bytes.toBytes("foo~") ); 如果(args.length == 1){ scan.setCaching(Integer.valueOf(args[0])); } System.out.println("scan的缓存是" + scan.getCaching()); FilterList allFilters = new FilterList(); allFilters.addFilter(new FirstKeyOnlyFilter()); allFilters.addFilter(new KeyOnlyFilter()); scan.setFilter(allFilters); ResultScanner 扫描仪 = table.getScanner(scan); 整数计数 = 0; 长开始 = System.currentTimeMillis(); 尝试 { for (结果 rr =scanner.next(); rr != null; rr =scanner.next()) { 计数 += 1; if (count % 100000 == 0) System.out.println(count); } } 最后 { 扫描仪.close(); } 长端 = System.currentTimeMillis(); long elapsedTime = 结束 - 开始; System.out.println("经过的时间是 " + (elapsedTime/1000F)); } }
这是一个pychbase代码示例:
从 pychbase 导入连接 c = 连接() t = c.table('my_table') # 在后台这应用了 FirstKeyOnlyFilter 和 KeyOnlyFilter # 类似于下面的happybase示例 打印 t.count(row_prefix="foo")
这是一个 Happybase 代码示例:
从happybase导入连接 c = 连接(...) t = c.table('my_table') 计数 = 0 对于 _ in t.scan(filter='FirstKeyOnlyFilter() AND KeyOnlyFilter()'): 计数 += 1 打印计数
感谢@Tuckr 和@KennyCason的提示。
在 HBASE 中计算行数的简单、有效和高效的方法:
每当您插入一行时,都会触发此 API,它将增加该特定单元格。
Htable.incrementColumnValue(Bytes.toBytes("count"), Bytes.toBytes("details"), Bytes.toBytes("count"), 1);
检查该表中存在的行数。只需对该特定行“计数”使用“获取”或“扫描”API。
通过使用此方法,您可以在不到一毫秒的时间内获得行数。
要计算正确 YARN 集群上的 Hbase 表记录数,您还必须设置 map reduce 作业队列名称:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter -Dmapreduce.job.queuename= < Your Q Name which you have SUBMIT access>
< TABLE_NAME>
您可以使用自 HBase 0.92 以来可用的协处理器。请参阅协处理器和AggregateProtocol和示例
两种方法为我工作以速度从 hbase 表中获取行数
情景#1
如果 hbase 表大小很小,则使用有效用户登录到 hbase shell 并执行
>count '<tablename>'
例子
>count 'employee'
6 row(s) in 0.1110 seconds
情景#2
如果 hbase 表大小很大,则执行内置的 RowCounter map reduce 作业:使用有效用户登录到 hadoop 机器并执行:
/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter '<tablename>'
例子:
/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'employee'
....
....
....
Virtual memory (bytes) snapshot=22594633728
Total committed heap usage (bytes)=5093457920
org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters
ROWS=6
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
如果您使用的是扫描仪,请在您的扫描仪中尝试让它返回尽可能少的限定符。实际上,您返回的限定符应该是可用的最小的(以字节为单位)。这将大大加快您的扫描速度。
不幸的是,这只会扩大到目前(数百万?)。更进一步,您可以实时执行此操作,但您首先需要运行 mapreduce 作业来计算所有行数。
将 Mapreduce 输出存储在 HBase 的一个单元中。每次添加一行,计数器加 1。每次删除一行,计数器减一。
当您需要实时访问行数时,您可以在 HBase 中读取该字段。
没有快速的方法来以可扩展的方式计算行数。你只能数这么快。
转到 Hbase 主目录并运行此命令,
./bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'namespace:tablename'
这将启动一个 mapreduce 作业,输出将显示 hbase 表中存在的记录数。
你可以在这里找到示例:
/**
* Used to get the number of rows of the table
* @param tableName
* @param familyNames
* @return the number of rows
* @throws IOException
*/
public long countRows(String tableName, String... familyNames) throws IOException {
long rowCount = 0;
Configuration configuration = connection.getConfiguration();
// Increase RPC timeout, in case of a slow computation
configuration.setLong("hbase.rpc.timeout", 600000);
// Default is 1, set to a higher value for faster scanner.next(..)
configuration.setLong("hbase.client.scanner.caching", 1000);
AggregationClient aggregationClient = new AggregationClient(configuration);
try {
Scan scan = new Scan();
if (familyNames != null && familyNames.length > 0) {
for (String familyName : familyNames) {
scan.addFamily(Bytes.toBytes(familyName));
}
}
rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
} catch (Throwable e) {
throw new IOException(e);
}
return rowCount;
}
你可以试试 hbase api 方法!
org.apache.hadoop.hbase.client.coprocessor.AggregationClient