希望这对其他人有帮助...
我发现如果要将日志文件拆分为许多不同的文件(每个 event_type 的文件),Hive 不是要走的路。恕我直言,Hive 提供的动态分区有太多限制。
我最终做的是编写一个自定义的 map-reduce jar。我还发现旧的 Hadoop 接口更合适,因为它提供了 MultipleTextOutputFormat 抽象类,可让您实现 generateFileNameForKeyValue()。(新的 hadoop 提供了一种不同的多输出文件机制:MultipleOutputs,如果您有预定义的输出位置,这是非常棒的,但没有知道如何从键值中动态获取它们)
示例代码:
\*
Run example:
hadoop jar DynamicSplit.jar DynamicEventSplit.DynamicEventSplitMultifileMapReduce /event/US/incoming/2013-01-01-01/ event US 2013-01-01-01 2 "[a-zA-Z0-9_ ]+" "/event/dynamicsplit1/" ","
*/
package DynamicEventSplit;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
public class DynamicEventSplitMultifileMapReduce
{
static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
private String event_name;
private String EventNameRegexp;
private int EventNameColumnNumber;
private String columndelimeter=",";
public void configure(JobConf job)
{
EventNameRegexp=job.get("EventNameRegexp");
EventNameColumnNumber=Integer.parseInt(job.get("EventNameColumnNumber"));
columndelimeter=job.get("columndelimeter");
}
public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
//check that expected event_name field exists
String [] dall=value.toString().split(columndelimeter);
if (dall.length<EventNameColumnNumber)
{
return;
}
event_name=dall[EventNameColumnNumber-1];
//check that expected event_name is valid
if (!event_name.matches(EventNameRegexp))
{
return;
}
output.collect(new Text(dall[1]),value);
}
}
static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
while (values.hasNext())
{
output.collect(key, values.next());
}
}
}
static class MultiFileOutput extends MultipleTextOutputFormat<Text, Text>
{
private String event_name;
private String site;
private String event_date;
private String year;
private String month;
private String day;
private String hour;
private String basepath;
public RecordWriter<Text,Text> getRecordWriter(FileSystem fs, JobConf job,String name, Progressable arg3) throws IOException
{
RecordWriter<Text,Text> rw=super.getRecordWriter(fs, job, name, arg3);
site=job.get("site");
event_date=job.get("date");
year=event_date.substring(0,4);
month=event_date.substring(5,7);
day=event_date.substring(8,10);
hour=event_date.substring(11,13);
basepath=job.get("basepath");
return rw;
}
protected String generateFileNameForKeyValue(Text key, Text value,String leaf)
{
event_name=key.toString();
return basepath+"event="+event_name+"/site="+site+"/year="+year+"/month="+month+"/day="+day+"/hour="+hour+"/"+leaf;
}
protected Text generateActualKey(Text key, Text value)
{
return null;
}
}
public static void main(String[] args) throws Exception
{
String InputFiles=args[0];
String OutputDir=args[1];
String SiteStr=args[2];
String DateStr=args[3];
String EventNameColumnNumber=args[4];
String EventNameRegexp=args[5];
String basepath=args[6];
String columndelimeter=args[7];
Configuration mycon=new Configuration();
JobConf conf = new JobConf(mycon,DynamicEventSplitMultifileMapReduce.class);
conf.set("site",SiteStr);
conf.set("date",DateStr);
conf.setOutputKeyClass(Text.class);
conf.setMapOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(MultiFileOutput.class);
conf.setMapSpeculativeExecution(false);
conf.setReduceSpeculativeExecution(false);
FileInputFormat.setInputPaths(conf,InputFiles);
FileOutputFormat.setOutputPath(conf,new Path("/"+OutputDir+SiteStr+DateStr+"/"));
conf.set("EventNameColumnNumber",EventNameColumnNumber);
conf.set("EventNameRegexp",EventNameRegexp);
conf.set("basepath",basepath);
conf.set("columndelimeter",columndelimeter);
JobClient.runJob(conf);
}
}