我有 7 个节点 cassandra (1.1.1) 和 hadoop (1.03) 集群( tasktracker 在每个 cassandra 节点上安装相同)。
我的列族使用宽行模式。1 行包含大约 200k 列(最多大约 300k)。
我的问题是当我们使用 Hadoop 运行分析作业(计算单词出现的次数)时,我收到的结果是错误的(结果太低,正如我在测试记录中所预期的那样)
当我们在作业跟踪器上监控时,有一个奇怪的是地图进度任务指示错误(在我的下图中),并且当我重新运行作业(相同数据)时“地图输入记录”的数量不同。
这是我的初始化工作代码:
Job job = new Job(conf);
job.setJobName(this.jobname);
job.setJarByClass(BannerCount.class);
job.setMapperClass(BannerViewMapper.class);
job.setReducerClass(BannerClickReducer.class);
FileSystem fs = FileSystem.get(conf);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "192.168.23.114,192.168.23.115,192.168.23.116,192.168.23.117,192.168.23.121,192.168.23.122,192.168.23.123");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, true);
ConfigHelper.setRangeBatchSize(job.getConfiguration(), 500);
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
sliceRange.setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER);
sliceRange.setCount(200000);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
String outPathString = "BannerViewResultV3" + COLUMN_FAMILY;
if (fs.exists(new Path(outPathString)))
fs.delete(new Path(outPathString), true);
FileOutputFormat.setOutputPath(job, new Path(outPathString));
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(28);
job.waitForCompletion(true);
return 1;