这是使用 HCatalog 在一项作业中写入具有动态分区的多个表的代码,该代码已在 Hadoop 2.5.0、Hive 0.13.1 上进行了测试:
// ... Job setup, InputFormatClass, etc ...
String dbName = null;
String[] tables = {"table0", "table1"};
job.setOutputFormatClass(MultiOutputFormat.class);
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
List<String> partitions = new ArrayList<String>();
partitions.add(0, "partition0");
partitions.add(1, "partition1");
HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);
for (String table : tables) {
configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);
OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);
outputJobInfo.setDynamicPartitioningKeys(partitions);
HCatOutputFormat.setOutput(
configurer.getJob(table), outputJobInfo
);
HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());
schema.append(partition0);
schema.append(partition1);
HCatOutputFormat.setSchema(
configurer.getJob(table),
schema
);
}
configurer.configure();
return job.waitForCompletion(true) ? 0 : 1;
映射器:
public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
HCatRecord record = new DefaultHCatRecord(3); // Including partitions
record.set(0, value.toString());
// partitions must be set after non-partition fields
record.set(1, "0"); // partition0=0
record.set(2, "1"); // partition1=1
MultiOutputFormat.write("table0", null, record, context);
MultiOutputFormat.write("table1", null, record, context);
}
}