0

我在 Eclipse 中编写了 cassandra wordcount 并在 hadoop 上运行。运行时它显示输出 rcords 为 0,而我在 cassandra 中的输出表为空。为什么会这样?

这是我的代码:

  public class WordcountCass extends Configured implements Tool
{

    public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private ByteBuffer sourceColumn;

        String punctuationsToStrip[] = { "\"", "'", ",", ";", "!", ":", "\\?", "\\.", "\\(", "\\-", "\\[", "\\)", "\\]" };

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get("columnname"));
        }

        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
        {
            // Our slice predicate contains only one column. We fetch it here
            IColumn column = columns.get(sourceColumn);
            if (column == null)
                return;
            String value = ByteBufferUtil.string(column.value());

            value = value.toLowerCase();
            for (String pattern : punctuationsToStrip) {
              value = value.replaceAll(pattern, "");
            }

            StringTokenizer itr = new StringTokenizer(value);
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
    {
        private ByteBuffer outputKey;

        protected void setup(Reducer.Context context) throws IOException, InterruptedException
        {
            // The row key is the name of the column from which we read the text
            outputKey = ByteBufferUtil.bytes(context.getConfiguration().get("columnname"));
        }

        public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(outputKey, Collections.singletonList(getMutation(word, sum)));
        }

        // See Cassandra API (http://wiki.apache.org/cassandra/API)
        private static Mutation getMutation(Text word, int sum)
        {
            Column c = new Column();
            c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
            c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
            c.setTimestamp(System.currentTimeMillis());

            Mutation m = new Mutation();
            m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
            m.column_or_supercolumn.setColumn(c);
            return m;
        }
    }

    public int run(String[] args) throws Exception
    {
        String columnName = "name";
        getConf().set("columnname", columnName);

        //Configuration conf = new Configuration();
        Job job = new Job(getConf(), "wordcount");
        job.setJarByClass(WordcountCass.class);

        job.setMapperClass(TokenizerMapper.class);
        // Tell the Mapper to expect Cassandra columns as input
        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        // Tell the "Shuffle/Sort" phase of M/R what type of Key/Value to expect from the mapper
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(ReducerToCassandra.class);
        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
        job.setOutputKeyClass(ByteBuffer.class);
        job.setOutputValueClass(List.class);

        // Set the keyspace and column family for the output of this job
        ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "wordcount", "outputword");
        // Set the keyspace and column family for the input of this job
        ConfigHelper.setInputColumnFamily(job.getConfiguration(), "wordcount", "inputword");
        ConfigHelper.setRangeBatchSize(job.getConfiguration(), 500);

        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), "127.0.0.1");
        ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
        ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
        ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "127.0.0.1");
       // ConfigHelper.getOutputPartitioner(job.getConfiguration());
        // Set the predicate that determines what columns will be selected from each row

        SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
        // The "get_slice" (see Cassandra's API) operation will be applied on each row of the ColumnFamily.
        // Each row will be handled by one Map job.
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0:1;
    }

    public static void main(String[] args) throws Exception
    {
        // Let ToolRunner handle generic command-line options
        ToolRunner.run(new Configuration(), new WordcountCass(), args);
        System.exit(0);
    }
}

这是输出控制台:

 13/09/12 10:43:36 INFO util.NativeCodeLoader: Loaded the native-hadoop library
 13/09/12 10:43:36 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
 13/09/12 10:43:39 INFO mapred.JobClient: Running job: job_local1513748539_0001
 13/09/12 10:43:39 INFO mapred.LocalJobRunner: Waiting for map tasks
 13/09/12 10:43:39 INFO mapred.LocalJobRunner: Starting task: attempt_local1513748539_0001_m_000000_0
 13/09/12 10:43:39 INFO util.ProcessTree: setsid exited with exit code 0
 13/09/12 10:43:39 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4204
 13/09/12 10:43:39 INFO mapred.MapTask: Processing split: ColumnFamilySplit((-1, '1684704676388456087] @[localhost])
 13/09/12 10:43:40 INFO mapred.JobClient:  map 0% reduce 0%
 13/09/12 10:43:40 INFO mapred.MapTask: io.sort.mb = 100
 13/09/12 10:43:41 INFO mapred.MapTask: data buffer = 79691776/99614720
 13/09/12 10:43:41 INFO mapred.MapTask: record buffer = 262144/327680
 13/09/12 10:43:41 INFO mapred.MapTask: Starting flush of map output
 13/09/12 10:43:41 INFO mapred.Task: Task:attempt_local1513748539_0001_m_000000_0 is done. And is in the process of commiting
 13/09/12 10:43:41 INFO mapred.LocalJobRunner: 
 13/09/12 10:43:41 INFO mapred.Task: Task 'attempt_local1513748539_0001_m_000000_0' done.
 13/09/12 10:43:41 INFO mapred.LocalJobRunner: Finishing task: attem  pt_local1513748539_0001_m_000000_0
 13/09/12 10:43:41 INFO mapred.LocalJobRunner: Starting task: attempt_local1513748539_0001_m_000001_0
 13/09/12 10:43:41 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@29ac
 13/09/12 10:43:41 INFO mapred.MapTask: Processing split: ColumnFamilySplit((1684704676388456087, '-1] @[localhost])
 13/09/12 10:43:41 INFO mapred.MapTask: io.sort.mb = 100
 13/09/12 10:43:42 INFO mapred.MapTask: data buffer = 79691776/99614720
 13/09/12 10:43:42 INFO mapred.MapTask: record buffer = 262144/327680
 13/09/12 10:43:42 INFO mapred.JobClient:  map 50% reduce 0%
 13/09/12 10:43:42 INFO mapred.MapTask: Starting flush of map output
 13/09/12 10:43:42 INFO mapred.Task: Task:attempt_local1513748539_0001_m_000001_0 is done. And is in the process of commiting
 13/09/12 10:43:42 INFO mapred.LocalJobRunner: 
 13/09/12 10:43:42 INFO mapred.Task: Task 'attempt_local1513748539_0001_m_000001_0' done.
 13/09/12 10:43:42 INFO mapred.LocalJobRunner: Finishing task:attempt_local1513748539_0001_m_000001_0
13/09/12 10:43:42 INFO mapred.LocalJobRunner: Map task executor complete.
13/09/12 10:43:42 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@3bc20e
13/09/12 10:43:42 INFO mapred.LocalJobRunner: 
13/09/12 10:43:42 INFO mapred.Merger: Merging 2 sorted segments
13/09/12 10:43:42 INFO mapred.Merger: Down to the last merge-pass, with 0 segments left of total size: 0 bytes
13/09/12 10:43:42 INFO mapred.LocalJobRunner: 
13/09/12 10:43:42 INFO mapred.Task: Task:attempt_local1513748539_0001_r_000000_0 is done. And is in the process of commiting
13/09/12 10:43:42 INFO mapred.LocalJobRunner: reduce > reduce
13/09/12 10:43:42 INFO mapred.Task: Task 'attempt_local1513748539_0001_r_000000_0' done.
13/09/12 10:43:42 WARN mapred.FileOutputCommitter: Output path is null in cleanup
13/09/12 10:43:43 INFO mapred.JobClient:  map 100% reduce 100%
13/09/12 10:43:43 INFO mapred.JobClient: Job complete: job_local1513748539_0001
13/09/12 10:43:43 INFO mapred.JobClient: Counters: 20
13/09/12 10:43:43 INFO mapred.JobClient:   File Output Format Counters 
13/09/12 10:43:43 INFO mapred.JobClient:     Bytes Written=0
13/09/12 10:43:43 INFO mapred.JobClient:   FileSystemCounters
13/09/12 10:43:43 INFO mapred.JobClient:     FILE_BYTES_READ=1123
13/09/12 10:43:43 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=163894
13/09/12 10:43:43 INFO mapred.JobClient:   File Input Format Counters 
13/09/12 10:43:43 INFO mapred.JobClient:     Bytes Read=0
13/09/12 10:43:43 INFO mapred.JobClient:   Map-Reduce Framework
13/09/12 10:43:43 INFO mapred.JobClient:     Map output materialized bytes=12
13/09/12 10:43:43 INFO mapred.JobClient:     Map input records=10
13/09/12 10:43:43 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/09/12 10:43:43 INFO mapred.JobClient:     Spilled Records=0
13/09/12 10:43:43 INFO mapred.JobClient:     Map output bytes=0
13/09/12 10:43:43 INFO mapred.JobClient:     Total committed heap usage (bytes)=666501120
13/09/12 10:43:43 INFO mapred.JobClient:     CPU time spent (ms)=0
13/09/12 10:43:43 INFO mapred.JobClient:     SPLIT_RAW_BYTES=172
13/09/12 10:43:43 INFO mapred.JobClient:     Combine input records=0
13/09/12 10:43:43 INFO mapred.JobClient:     Reduce input records=0
13/09/12 10:43:43 INFO mapred.JobClient:     Reduce input groups=0
13/09/12 10:43:43 INFO mapred.JobClient:     Combine output records=0
13/09/12 10:43:43 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/09/12 10:43:43 INFO mapred.JobClient:     Reduce output records=0
13/09/12 10:43:43 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/09/12 10:43:43 INFO mapred.JobClient:     Map output records=0

我在 cassandra 中的表是:输入字:

 id | name      | locate
----+-----------+--------
  5 |     abhar | zanjan
 10 | mahneshan | zanjan
  1 |    zanjan | zanjan
  8 |     abbar | zanjan
  2 |    zanjan | zanjan
  4 |     abhar | zanjan
  7 |     abbar | zanjan
  6 |   gheidar | zanjan
  9 |     abbar | zanjan
  3 |    zanjan | zanjan

和输出字:

 id | name      | number
----+-----------+--------

为什么没有输出?

4

0 回答 0