我的 cassandra scehma 在列族中只有一行。运行 mapreduce 时,映射器会一次又一次地读取同一行。因此,mapper 进入无穷大,reducer 卡住了....
这些是使用的配置
conf.set("fs.default.name", "hdfs://28.151.181.107:9000");
conf.set("mapred.job.tracker", "28.151.181.107:9001");
conf.setJar("C:\\hadoop-test\\demo\\target\\demo-0.0.1-SNAPSHOT.jar");
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(ReducerToFilesystem.class);
conf.setReducerClass(ReducerToFilesystem.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(conf, new Path(resultFileName));
conf.setInputFormat(ColumnFamilyInputFormat.class);
ConfigHelper.setInputRpcPort(conf, PORT + "");
ConfigHelper.setInputInitialAddress(conf, HOST);
ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
ConfigHelper.setInputColumnFamily(conf, KEY_SPACE, COLUMN_FAMILY,true);
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(conf, predicate);
ConfigHelper.setOutputInitialAddress(conf, HOST);
ConfigHelper.setOutputPartitioner(conf, "RandomPartitioner");
和 Mapper & Reducer 是
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
OutputCollector<Text, Text> paramOutputCollector,
Reporter paramReporter) throws IOException {
DateSerializer sz = new DateSerializer();
StringSerializer s = new StringSerializer();
for (IColumn col : columns.values()) {
Date name = sz.fromByteBuffer(col.name());
double value = ByteBufferUtil.toDouble(col.value());
paramOutputCollector.collect(new Text(s.fromByteBuffer(key)),
new Text(name.toGMTString() + " [] [] " + value));
}
}
public static class ReducerToFilesystem implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> paramOutputCollector,
Reporter paramReporter) throws IOException {
StringBuffer bfr = new StringBuffer();
while (values.hasNext()) {
Text val = values.next();
bfr.append(val);
bfr.append("<--->");
}
paramOutputCollector.collect(key, new Text(bfr.toString()));
}
请指导。
谢谢您的帮助!