我最近设置了 4 节点 Cassandra 集群,用于使用一个包含时间序列数据的列族进行学习。
Key -> {column name: timeUUID, column value: csv log line, ttl: 1year},我使用Netflix Astyanax java客户端加载了大约100万条日志行。
我还将 Hadoop 配置为运行具有 1 个名称节点和 4 个数据节点的 map-reduce 作业,以对 Cassandra 数据运行一些分析。
互联网上所有可用的示例都使用列名作为 Hadoop 作业配置的 SlicePredicate,其中我有 timeUUID 作为列,我如何有效地将 Cassandra 数据提供给 Hadoop 作业配置器,一次批处理 1000 列。
此测试数据中的某些行有超过 10000 列,并且预计在实际数据中会更多。
我将我的工作配置为
public int run(String[] arg0) throws Exception {
Job job = new Job(getConf(), JOB_NAME);
Job.setJarByClass(LogTypeCounterByDate.class);
job.setMapperClass(LogTypeCounterByDateMapper.class);
job.setReducerClass(LogTypeCounterByDateReducer.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
ConfigHelper.setRangeBatchSize(getConf(), 1000);
SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]),
ByteBuffer.wrap(new byte[0]), true, 1000);
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADRESS);
ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
但我无法理解我是如何定义 Mapper 的,请您提供 Mapper 类的模板。
public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
{
private Text key = null;
private LongWritable value = null;
@Override
protected void setup(Context context){
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context){
//String[] lines = columns.;
}
}