我在 Eclipse 中编写了 cassandra wordcount 并在 hadoop 上运行。运行时它显示输出 rcords 为 0,而我在 cassandra 中的输出表为空。为什么会这样?
这是我的代码:
public class WordcountCass extends Configured implements Tool
{
public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ByteBuffer sourceColumn;
String punctuationsToStrip[] = { "\"", "'", ",", ";", "!", ":", "\\?", "\\.", "\\(", "\\-", "\\[", "\\)", "\\]" };
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get("columnname"));
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
{
// Our slice predicate contains only one column. We fetch it here
IColumn column = columns.get(sourceColumn);
if (column == null)
return;
String value = ByteBufferUtil.string(column.value());
value = value.toLowerCase();
for (String pattern : punctuationsToStrip) {
value = value.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
private ByteBuffer outputKey;
protected void setup(Reducer.Context context) throws IOException, InterruptedException
{
// The row key is the name of the column from which we read the text
outputKey = ByteBufferUtil.bytes(context.getConfiguration().get("columnname"));
}
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(outputKey, Collections.singletonList(getMutation(word, sum)));
}
// See Cassandra API (http://wiki.apache.org/cassandra/API)
private static Mutation getMutation(Text word, int sum)
{
Column c = new Column();
c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
c.setTimestamp(System.currentTimeMillis());
Mutation m = new Mutation();
m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
m.column_or_supercolumn.setColumn(c);
return m;
}
}
public int run(String[] args) throws Exception
{
String columnName = "name";
getConf().set("columnname", columnName);
//Configuration conf = new Configuration();
Job job = new Job(getConf(), "wordcount");
job.setJarByClass(WordcountCass.class);
job.setMapperClass(TokenizerMapper.class);
// Tell the Mapper to expect Cassandra columns as input
job.setInputFormatClass(ColumnFamilyInputFormat.class);
// Tell the "Shuffle/Sort" phase of M/R what type of Key/Value to expect from the mapper
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(ReducerToCassandra.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
// Set the keyspace and column family for the output of this job
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "wordcount", "outputword");
// Set the keyspace and column family for the input of this job
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "wordcount", "inputword");
ConfigHelper.setRangeBatchSize(job.getConfiguration(), 500);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "127.0.0.1");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "127.0.0.1");
// ConfigHelper.getOutputPartitioner(job.getConfiguration());
// Set the predicate that determines what columns will be selected from each row
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
// The "get_slice" (see Cassandra's API) operation will be applied on each row of the ColumnFamily.
// Each row will be handled by one Map job.
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
return job.isSuccessful() ? 0:1;
}
public static void main(String[] args) throws Exception
{
// Let ToolRunner handle generic command-line options
ToolRunner.run(new Configuration(), new WordcountCass(), args);
System.exit(0);
}
}
这是输出控制台:
13/09/12 10:43:36 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/09/12 10:43:36 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/09/12 10:43:39 INFO mapred.JobClient: Running job: job_local1513748539_0001
13/09/12 10:43:39 INFO mapred.LocalJobRunner: Waiting for map tasks
13/09/12 10:43:39 INFO mapred.LocalJobRunner: Starting task: attempt_local1513748539_0001_m_000000_0
13/09/12 10:43:39 INFO util.ProcessTree: setsid exited with exit code 0
13/09/12 10:43:39 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4204
13/09/12 10:43:39 INFO mapred.MapTask: Processing split: ColumnFamilySplit((-1, '1684704676388456087] @[localhost])
13/09/12 10:43:40 INFO mapred.JobClient: map 0% reduce 0%
13/09/12 10:43:40 INFO mapred.MapTask: io.sort.mb = 100
13/09/12 10:43:41 INFO mapred.MapTask: data buffer = 79691776/99614720
13/09/12 10:43:41 INFO mapred.MapTask: record buffer = 262144/327680
13/09/12 10:43:41 INFO mapred.MapTask: Starting flush of map output
13/09/12 10:43:41 INFO mapred.Task: Task:attempt_local1513748539_0001_m_000000_0 is done. And is in the process of commiting
13/09/12 10:43:41 INFO mapred.LocalJobRunner:
13/09/12 10:43:41 INFO mapred.Task: Task 'attempt_local1513748539_0001_m_000000_0' done.
13/09/12 10:43:41 INFO mapred.LocalJobRunner: Finishing task: attem pt_local1513748539_0001_m_000000_0
13/09/12 10:43:41 INFO mapred.LocalJobRunner: Starting task: attempt_local1513748539_0001_m_000001_0
13/09/12 10:43:41 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@29ac
13/09/12 10:43:41 INFO mapred.MapTask: Processing split: ColumnFamilySplit((1684704676388456087, '-1] @[localhost])
13/09/12 10:43:41 INFO mapred.MapTask: io.sort.mb = 100
13/09/12 10:43:42 INFO mapred.MapTask: data buffer = 79691776/99614720
13/09/12 10:43:42 INFO mapred.MapTask: record buffer = 262144/327680
13/09/12 10:43:42 INFO mapred.JobClient: map 50% reduce 0%
13/09/12 10:43:42 INFO mapred.MapTask: Starting flush of map output
13/09/12 10:43:42 INFO mapred.Task: Task:attempt_local1513748539_0001_m_000001_0 is done. And is in the process of commiting
13/09/12 10:43:42 INFO mapred.LocalJobRunner:
13/09/12 10:43:42 INFO mapred.Task: Task 'attempt_local1513748539_0001_m_000001_0' done.
13/09/12 10:43:42 INFO mapred.LocalJobRunner: Finishing task:attempt_local1513748539_0001_m_000001_0
13/09/12 10:43:42 INFO mapred.LocalJobRunner: Map task executor complete.
13/09/12 10:43:42 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@3bc20e
13/09/12 10:43:42 INFO mapred.LocalJobRunner:
13/09/12 10:43:42 INFO mapred.Merger: Merging 2 sorted segments
13/09/12 10:43:42 INFO mapred.Merger: Down to the last merge-pass, with 0 segments left of total size: 0 bytes
13/09/12 10:43:42 INFO mapred.LocalJobRunner:
13/09/12 10:43:42 INFO mapred.Task: Task:attempt_local1513748539_0001_r_000000_0 is done. And is in the process of commiting
13/09/12 10:43:42 INFO mapred.LocalJobRunner: reduce > reduce
13/09/12 10:43:42 INFO mapred.Task: Task 'attempt_local1513748539_0001_r_000000_0' done.
13/09/12 10:43:42 WARN mapred.FileOutputCommitter: Output path is null in cleanup
13/09/12 10:43:43 INFO mapred.JobClient: map 100% reduce 100%
13/09/12 10:43:43 INFO mapred.JobClient: Job complete: job_local1513748539_0001
13/09/12 10:43:43 INFO mapred.JobClient: Counters: 20
13/09/12 10:43:43 INFO mapred.JobClient: File Output Format Counters
13/09/12 10:43:43 INFO mapred.JobClient: Bytes Written=0
13/09/12 10:43:43 INFO mapred.JobClient: FileSystemCounters
13/09/12 10:43:43 INFO mapred.JobClient: FILE_BYTES_READ=1123
13/09/12 10:43:43 INFO mapred.JobClient: FILE_BYTES_WRITTEN=163894
13/09/12 10:43:43 INFO mapred.JobClient: File Input Format Counters
13/09/12 10:43:43 INFO mapred.JobClient: Bytes Read=0
13/09/12 10:43:43 INFO mapred.JobClient: Map-Reduce Framework
13/09/12 10:43:43 INFO mapred.JobClient: Map output materialized bytes=12
13/09/12 10:43:43 INFO mapred.JobClient: Map input records=10
13/09/12 10:43:43 INFO mapred.JobClient: Reduce shuffle bytes=0
13/09/12 10:43:43 INFO mapred.JobClient: Spilled Records=0
13/09/12 10:43:43 INFO mapred.JobClient: Map output bytes=0
13/09/12 10:43:43 INFO mapred.JobClient: Total committed heap usage (bytes)=666501120
13/09/12 10:43:43 INFO mapred.JobClient: CPU time spent (ms)=0
13/09/12 10:43:43 INFO mapred.JobClient: SPLIT_RAW_BYTES=172
13/09/12 10:43:43 INFO mapred.JobClient: Combine input records=0
13/09/12 10:43:43 INFO mapred.JobClient: Reduce input records=0
13/09/12 10:43:43 INFO mapred.JobClient: Reduce input groups=0
13/09/12 10:43:43 INFO mapred.JobClient: Combine output records=0
13/09/12 10:43:43 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
13/09/12 10:43:43 INFO mapred.JobClient: Reduce output records=0
13/09/12 10:43:43 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
13/09/12 10:43:43 INFO mapred.JobClient: Map output records=0
我在 cassandra 中的表是:输入字:
id | name | locate
----+-----------+--------
5 | abhar | zanjan
10 | mahneshan | zanjan
1 | zanjan | zanjan
8 | abbar | zanjan
2 | zanjan | zanjan
4 | abhar | zanjan
7 | abbar | zanjan
6 | gheidar | zanjan
9 | abbar | zanjan
3 | zanjan | zanjan
和输出字:
id | name | number
----+-----------+--------
为什么没有输出?