9

我正在考虑使用 HBase 作为我的 MapReduce 作业之一的来源。我知道 TableInputFormat 为每个区域指定一个输入拆分(因此是一个映射器)。但是,这似乎效率低下。我真的很想让多个映射器同时在一个给定的区域上工作。我可以通过扩展 TableInputFormatBase 来实现这一点吗?你能给我举个例子吗?此外,这甚至是一个好主意吗?

谢谢您的帮助。

4

5 回答 5

3

您需要一个扩展 InputFormat 的自定义输入格式。您可以从回答到我想扫描大量数据(基于范围的查询)的问题中了解如何做到这一点,在写入数据时我可以做哪些优化,以便扫描变得更快。如果数据处理时间比数据检索时间长,这是一个好主意。

于 2012-07-06T15:55:20.583 回答
1

不确定是否可以为给定区域指定多个映射器,但请考虑以下事项:

如果您认为每个区域使用一个映射器效率低下(可能您的数据节点没有像 #cpus 这样的足够资源),您或许可以在文件 hbase-site.xml 中指定较小的区域大小。

如果您想考虑更改默认配置选项,这里是一个站点:http: //hbase.apache.org/configuration.html#hbase_default_configurations

请注意,通过缩小区域大小,您将增加 DFS 中的文件数量,这可能会根据您的 namenode 的内存限制 hadoop DFS 的容量。请记住,namenode 的内存使用与 DFS 中的文件数量直接相关。这可能与您的情况相关,也可能不相关,因为我不知道您的集群是如何使用的。这些问题从来没有灵丹妙药的答案!

于 2012-07-04T05:05:44.473 回答
0

1. 只需确保映射器之间的键集是互斥的,就可以了。

  1. 您没有创建太多客户端,因为这可能会导致大量 gc,因为在 hbase 读取期间会发生 hbase 块缓存搅动
于 2015-04-28T18:16:45.443 回答
0

使用此 MultipleScanTableInputFormat,您可以使用 MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 配置来控制应针对单个区域服务器执行的映射器数量。该类将按其位置(区域服务器)对所有输入拆分进行分组,并且 RecordReader 将正确地遍历映射器的所有聚合拆分。

这是示例

https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

您为单个映射器创建了多个聚合拆分的工作

private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();

final Scan scan = getScan();

for (int i = 0; i < startRows.size(); i++) {
  scan.setStartRow(startRows.get(i));
  scan.setStopRow(stopRows.get(i));

  setScan(scan);

  aggregatedSplits.addAll(super.getSplits(context));
}

// set the state back to where it was.. 
scan.setStopRow(null);
scan.setStartRow(null);

setScan(scan);

return aggregatedSplits;
 }

按区域服务器创建分区

 @Override
 public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> source = getAggregatedSplits(context);

if (!partitionByRegionServer) {
  return source;
}

// Partition by regionserver
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
for (InputSplit split : source) {
  TableSplit cast = (TableSplit) split;
  String rs = cast.getRegionLocation();

  partitioned.put(rs, cast);
}
于 2016-12-08T08:51:03.200 回答
-1

如果您想使用仅找到几条记录的条件扫描来扫描大区域(数亿行),这将很有用。这将防止 ScannerTimeoutException

package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;

public class RegionSplitTableInputFormat extends TableInputFormat {

    public static final String REGION_SPLIT = "region.split";

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {

        Configuration conf = context.getConfiguration();
        int regionSplitCount = conf.getInt(REGION_SPLIT, 0);
        List<InputSplit> superSplits = super.getSplits(context);
        if (regionSplitCount <= 0) {
            return superSplits;
        }

        List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount);

        for (InputSplit inputSplit : superSplits) {
            TableSplit tableSplit = (TableSplit) inputSplit;
            System.out.println("splitting by " + regionSplitCount + " " + tableSplit);
            byte[] startRow0 = tableSplit.getStartRow();
            byte[] endRow0 = tableSplit.getEndRow();
            boolean discardLastSplit = false;
            if (endRow0.length == 0) {
                endRow0 = new byte[startRow0.length];
                Arrays.fill(endRow0, (byte) 255);
                discardLastSplit = true;
            }
            byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount);
            if (discardLastSplit) {
                split[split.length - 1] = new byte[0];
            }
            for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) {
                byte[] startRow = split[regionSplit];
                byte[] endRow = split[regionSplit + 1];
                TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow,
                        tableSplit.getLocations()[0]);
                splits.add(newSplit);
            }
        }

        return splits;
    }
}
于 2015-01-22T15:20:32.537 回答