3

我是 Hive 和 MapReduce 的新手,非常感谢您的回答并提供正确的方法。

logs在配置单元中定义了一个外部表,该表在日期和源服务器上分区,外部位置在 hdfs 上/data/logs/。我有一个 MapReduce 作业,它获取这些日志文件并将它们拆分并存储在上述文件夹下。喜欢

"/data/logs/dt=2012-10-01/server01/"
"/data/logs/dt=2012-10-01/server02/"
...
...

从 MapReduce 作业中,我想将分区添加到 Hive 中的表日志中。我知道这两种方法

  1. alter table 命令 -- 太多的 alter table 命令
  2. 添加动态分区

对于方法二,我只看到INSERT OVERWRITE不适合我的示例。有没有办法在作业结束后将这些新分区添加到表中?

4

3 回答 3

3

要在 Map/Reduce 作业中执行此操作,我建议使用 Apache HCatalog,这是一个在 Hadoop 下标记的新项目。

HCatalog 确实是 HDFS 之上的抽象层,因此您可以以标准化的方式编写输出,无论是来自 Hive、Pig 还是 M/R。对您来说,这就是您可以使用输出格式从 Map/Reduce 作业直接加载 Hive 中的数据的地方HCatOutputFormat。下面是一个取自官网的例子。

为 (a=1,b=1) 写出特定分区的当前代码示例将如下所示:

Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("a", "1");
partitionValues.put("b", "1");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);

并且要写入多个分区,必须使用上述每个分区启动单独的作业。

您还可以将动态分区与 HCatalog 一起使用,在这种情况下,您可以在同一个作业中加载任意数量的分区!

我建议在上面提供的网站上进一步阅读 H​​Catalog,如果需要,它会为您提供更多详细信息。

于 2013-01-11T22:24:07.893 回答
3

实际上,事情比这要复杂一些,这很不幸,因为它在官方来源中没有记录(截至目前),而且需要几天的挫折才能弄清楚。

我发现我需要执行以下操作才能让 HCatalog Mapreduce 作业与写入动态分区一起工作:

在我工作的记录写入阶段(通常是 reducer),我必须手动将我的动态分区(HCatFieldSchema)添加到我的 HCatSchema 对象中。

问题是 HCatOutputFormat.getTableSchema(config) 实际上并没有返回分区字段。它们需要手动添加

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null);
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null);
schema.append(hfs1);
schema.append(hfs2);
于 2014-04-22T00:06:12.110 回答
0

这是使用 HCatalog 在一项作业中写入具有动态分区的多个表的代码,该代码已在 Hadoop 2.5.0、Hive 0.13.1 上进行了测试:

// ... Job setup, InputFormatClass, etc ...
String dbName = null;
String[] tables = {"table0", "table1"};

job.setOutputFormatClass(MultiOutputFormat.class);
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);

List<String> partitions = new ArrayList<String>();
partitions.add(0, "partition0");
partitions.add(1, "partition1");

HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);

for (String table : tables) {
    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);

    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);
    outputJobInfo.setDynamicPartitioningKeys(partitions);

    HCatOutputFormat.setOutput(
        configurer.getJob(table), outputJobInfo
    );

    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());
    schema.append(partition0);
    schema.append(partition1);

    HCatOutputFormat.setSchema(
        configurer.getJob(table),
        schema
    );
}
configurer.configure();

return job.waitForCompletion(true) ? 0 : 1;

映射器:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        HCatRecord record = new DefaultHCatRecord(3); // Including partitions
        record.set(0, value.toString());

        // partitions must be set after non-partition fields
        record.set(1, "0"); // partition0=0
        record.set(2, "1"); // partition1=1

        MultiOutputFormat.write("table0", null, record, context);
        MultiOutputFormat.write("table1", null, record, context);
    }
}
于 2014-10-30T14:13:02.017 回答