我正在考虑使用 HBase 作为我的 MapReduce 作业之一的来源。我知道 TableInputFormat 为每个区域指定一个输入拆分(因此是一个映射器)。但是,这似乎效率低下。我真的很想让多个映射器同时在一个给定的区域上工作。我可以通过扩展 TableInputFormatBase 来实现这一点吗?你能给我举个例子吗?此外,这甚至是一个好主意吗?
谢谢您的帮助。
我正在考虑使用 HBase 作为我的 MapReduce 作业之一的来源。我知道 TableInputFormat 为每个区域指定一个输入拆分(因此是一个映射器)。但是,这似乎效率低下。我真的很想让多个映射器同时在一个给定的区域上工作。我可以通过扩展 TableInputFormatBase 来实现这一点吗?你能给我举个例子吗?此外,这甚至是一个好主意吗?
谢谢您的帮助。
您需要一个扩展 InputFormat 的自定义输入格式。您可以从回答到我想扫描大量数据(基于范围的查询)的问题中了解如何做到这一点,在写入数据时我可以做哪些优化,以便扫描变得更快。如果数据处理时间比数据检索时间长,这是一个好主意。
不确定是否可以为给定区域指定多个映射器,但请考虑以下事项:
如果您认为每个区域使用一个映射器效率低下(可能您的数据节点没有像 #cpus 这样的足够资源),您或许可以在文件 hbase-site.xml 中指定较小的区域大小。
如果您想考虑更改默认配置选项,这里是一个站点:http: //hbase.apache.org/configuration.html#hbase_default_configurations
请注意,通过缩小区域大小,您将增加 DFS 中的文件数量,这可能会根据您的 namenode 的内存限制 hadoop DFS 的容量。请记住,namenode 的内存使用与 DFS 中的文件数量直接相关。这可能与您的情况相关,也可能不相关,因为我不知道您的集群是如何使用的。这些问题从来没有灵丹妙药的答案!
1. 只需确保映射器之间的键集是互斥的,就可以了。
使用此 MultipleScanTableInputFormat,您可以使用 MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 配置来控制应针对单个区域服务器执行的映射器数量。该类将按其位置(区域服务器)对所有输入拆分进行分组,并且 RecordReader 将正确地遍历映射器的所有聚合拆分。
这是示例
https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90
您为单个映射器创建了多个聚合拆分的工作
private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();
final Scan scan = getScan();
for (int i = 0; i < startRows.size(); i++) {
scan.setStartRow(startRows.get(i));
scan.setStopRow(stopRows.get(i));
setScan(scan);
aggregatedSplits.addAll(super.getSplits(context));
}
// set the state back to where it was..
scan.setStopRow(null);
scan.setStartRow(null);
setScan(scan);
return aggregatedSplits;
}
按区域服务器创建分区
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> source = getAggregatedSplits(context);
if (!partitionByRegionServer) {
return source;
}
// Partition by regionserver
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
for (InputSplit split : source) {
TableSplit cast = (TableSplit) split;
String rs = cast.getRegionLocation();
partitioned.put(rs, cast);
}
如果您想使用仅找到几条记录的条件扫描来扫描大区域(数亿行),这将很有用。这将防止 ScannerTimeoutException
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
public class RegionSplitTableInputFormat extends TableInputFormat {
public static final String REGION_SPLIT = "region.split";
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
int regionSplitCount = conf.getInt(REGION_SPLIT, 0);
List<InputSplit> superSplits = super.getSplits(context);
if (regionSplitCount <= 0) {
return superSplits;
}
List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount);
for (InputSplit inputSplit : superSplits) {
TableSplit tableSplit = (TableSplit) inputSplit;
System.out.println("splitting by " + regionSplitCount + " " + tableSplit);
byte[] startRow0 = tableSplit.getStartRow();
byte[] endRow0 = tableSplit.getEndRow();
boolean discardLastSplit = false;
if (endRow0.length == 0) {
endRow0 = new byte[startRow0.length];
Arrays.fill(endRow0, (byte) 255);
discardLastSplit = true;
}
byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount);
if (discardLastSplit) {
split[split.length - 1] = new byte[0];
}
for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) {
byte[] startRow = split[regionSplit];
byte[] endRow = split[regionSplit + 1];
TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow,
tableSplit.getLocations()[0]);
splits.add(newSplit);
}
}
return splits;
}
}