我写了以下程序
using namespace std;
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
#include "libpq-fe.h"
extern "C" {
#include "traverser.h"
}
class IndexMap:public HadoopPipes::Mapper {
public:
IndexMap(HadoopPipes::TaskContext & context) { }
void map(HadoopPipes::MapContext & context) {
std::vector<std::string> paths = HadoopUtils::splitString(context.getInputValue(), "rn");
unsigned int k = 4;
unsigned int l = 0;
string concatpaths[k];
if (paths.size() % k == 0) {
for (unsigned int i = 0; i < k; ++i) {
concatpaths[i] = paths[l];
l = l + paths.size() / k;
}
for (unsigned int i = 0; i < k; ++i) {
for (unsigned int j = 1; j < paths.size() / k; ++j) {
concatpaths[i] = +" " + paths[i * paths.size() / k + j];
}
}
} else {
l = 0;
for (unsigned int i = 0; i < k; ++i) {
concatpaths[i] = paths[l];
l = l + paths.size() / (k - 1);
}
for (unsigned int i = 0; i < k - 1; ++i) {
for (unsigned int j = 1; j < paths.size() / (k - 1); ++j) {
concatpaths[i] = +" " + paths[i * paths.size() / (k - 1) + j];
}
}
for (unsigned int j = 1; j < paths.size() - paths.size() / (k - 1) * (k - 1); ++j) {
concatpaths[k - 1] = +" " + paths[(k - 1) * paths.size() / (k - 1) + j];
}
for (unsigned int i = 0; i < k; ++i) {
context.emit(concatpaths[i], "0");
}
}
}
};
class IndexReduce:public HadoopPipes::Reducer {
public:
IndexReduce(HadoopPipes::TaskContext & context) { }
void reduce(HadoopPipes::ReduceContext & context) {
long int count = 0;
long int result = 0;
std::vector<std::string> processedpaths = HadoopUtils::splitString(context.getInputValue(), " ");
result = Configure("/export/hadoop-1.0.1/src/c++/hadoopc++/src/nsindexer.conf");
for (unsigned int i = 0; i < processedpaths.size(); ++i) {
count = Traverser(processedpaths[i].c_str());
}
context.emit(processedpaths[processedpaths.size() - 1], HadoopUtils::toString(count));
}
};
int main(int argc, char *argv[])
{
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<IndexMap, IndexReduce> ());
}
我开始了
root@one: /export/hadoop-1.0.1/bin# ./hadoop 管道 -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input paths.txt -输出输出。 txt -程序 bin/parindex
根据这个文件。
我收到以下错误
12/08/29 08:02:10 WARN util.NativeCodeLoader:无法为您的平台加载 native-hadoop 库...在适用的情况下使用内置 java 类 12/08/29 08:02:10 WARN mapred.JobClient:没有设置作业 jar 文件。可能找不到用户类。请参阅 JobConf(Class) 或 JobConf#setJar(String)。 12/08/29 08:02:10 INFO mapred.JobClient:清理暂存区文件:/tmp/hadoop-root/mapred/staging/root-2093574148/.staging/job_local_0001 12/08/29 08:02:10 错误 security.UserGroupInformation:PriviledgedActionException as:根本原因:org.apache.hadoop.mapred.FileAlreadyExistsException:输出目录文件:/export/hadoop-1.0.1/bin/out.txt 已经存在 线程“主”org.apache.hadoop.mapred.FileAlreadyExistsException 中的异常:输出目录文件:/export/hadoop-1.0.1/bin/out.txt 已存在 在 org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs (FileOutputFormat.java:121) 在 org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:891) 在 org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) 在 java.security.AccessController.doPrivileged(本机方法) 在 javax.security.auth.Subject.doAs(Subject.java:415) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) 在 org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850) 在 org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824) 在 org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261) 在 org.apache.hadoop.mapred.pipes.Submitter.runJob(Submitter.java:248) 在 org.apache.hadoop.mapred.pipes.Submitter.run(Submitter.java:479) 在 org.apache.hadoop.mapred.pipes.Submitter.main(Submitter.java:494)