3

我没有太多使用 cassandra 的经验,所以如果我采取了错误的方法,请原谅。

我正在尝试使用 map reduce 在 cassandra 中进行批量加载

基本上是字数示例

参考:http ://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

我已经放置了简单的 Hadoop Wordcount Mapper 示例,并按照上面的示例稍微修改了驱动程序代码和减速器。

我也成功生成了输出文件。现在我的疑问是如何执行加载到 cassandra 部分?我的方法有什么不同吗?

请指教。

这是驱动程序代码的一部分

 Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);

 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

Mapper 与普通的 wordcount 映射器相同,只是标记并发出 Word,1

减速器类的形式是

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}
4

1 回答 1

3

要将批量加载到 Cassandra 中,我建议您查看DataStax 中的这篇文章。基本上你需要为批量加载做两件事:

  • 您的输出数据本身不适合 Cassandra,您需要将其转换为 SSTables。
  • 拥有 SSTables 后,您需要能够将它们流式传输到 Cassandra。当然你不只是想把每个 SSTable 复制到每个节点,你只想把数据的相关部分复制到每个节点

在您使用 的情况下BulkOutputFormat,它应该sstableloader在幕后使用所有这些。我从来没有用过它MultipleOutputs,但它应该可以正常工作。

我认为你的情况的错误是你没有MultipleOutputs正确使用:你还在做 a context.write,当你真的应该写你的MultipleOutputs对象时。你现在这样做的方式,因为你正在写入常规Context,它将被默认输出格式TextOutputFormat而不是你在MultipleOutputs. 有关如何MultipleOutputs在减速器中使用的更多信息,请点击此处

一旦你写入了BulkOutputFormat你定义的正确输出格式,你的 SSTables 应该被创建并从集群中的每个节点流式传输到 Cassandra——你不需要任何额外的步骤,输出格式会为你处理好。

另外,我建议您查看这篇文章,他们还解释了如何使用BulkOutputFormat,但他们正在使用ConfigHelper您可能想要查看的,以便更轻松地配置您的 Cassandra 端点。

于 2013-02-05T06:15:27.020 回答