我正在做 Jeffrey Breen 的 R-Hadoop 教程(2012 年 10 月)。目前,我尝试填充 hdfs,然后运行 Jeffrey 在他的 RStudio 教程中发布的命令。不幸的是,我遇到了一些麻烦:
更新:我现在将数据文件夹移动到:(
/home/cloudera/data/hadoop/wordcount
对于航空公司数据也是如此)当我运行 populate.hdfs.sh 时,我得到以下输出:
[cloudera@localhost ~]$ /home/cloudera/TutorialBreen/bin/populate.hdfs.sh
mkdir: cannot create directory /user/cloudera: File exists
mkdir: cannot create directory /user/cloudera/wordcount: File exists
mkdir: cannot create directory /user/cloudera/wordcount/data: File exists
mkdir: cannot create directory /user/cloudera/airline: File exists
mkdir: cannot create directory /user/cloudera/airline/data: File exists
put: Target /user/cloudera/airline/data/20040325.csv already exists
然后我尝试了 RStudio 中的命令,如教程中所示,但最后出现错误。有人可以告诉我我做错了什么吗?
> if (LOCAL)
+ {
+ rmr.options.set(backend = 'local')
+ hdfs.data.root = 'data/local/airline'
+ hdfs.data = file.path(hdfs.data.root, '20040325-jfk-lax.csv')
+ hdfs.out.root = 'out/airline'
+ hdfs.out = file.path(hdfs.out.root, 'out')
+ if (!file.exists(hdfs.out))
+ dir.create(hdfs.out.root, recursive=T)
+ } else {
+ rmr.options.set(backend = 'hadoop')
+ hdfs.data.root = 'airline'
+ hdfs.data = file.path(hdfs.data.root, 'data')
+ hdfs.out.root = hdfs.data.root
+ hdfs.out = file.path(hdfs.out.root, 'out')
+ }
> asa.csvtextinputformat = make.input.format( format = function(con, nrecs) {
+ line = readLines(con, nrecs)
+ values = unlist( strsplit(line, "\\,") )
+ if (!is.null(values)) {
+ names(values) = c('Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime',
+ 'ArrTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum',
+ 'ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay',
+ 'DepDelay','Origin','Dest','Distance','TaxiIn','TaxiOut',
+ 'Cancelled','CancellationCode','Diverted','CarrierDelay',
+ 'WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay')
+ return( keyval(NULL, values) )
+ }
+ }, mode='text' )
> mapper.year.market.enroute_time = function(key, val) {
+ if ( !identical(as.character(val['Year']), 'Year')
+ & identical(as.numeric(val['Cancelled']), 0)
+ & identical(as.numeric(val['Diverted']), 0) ) {
+ if (val['Origin'] < val['Dest'])
+ market = paste(val['Origin'], val['Dest'], sep='-')
+ else
+ market = paste(val['Dest'], val['Origin'], sep='-')
+ output.key = c(val['Year'], market)
+ output.val = c(val['CRSElapsedTime'], val['ActualElapsedTime'], val['AirTime'])
+ return( keyval(output.key, output.val) )
+ }
+ }
> reducer.year.market.enroute_time = function(key, val.list) {
+ if ( require(plyr) )
+ val.df = ldply(val.list, as.numeric)
+ else { # this is as close as my deficient *apply skills can come w/o plyr
+ val.list = lapply(val.list, as.numeric)
+ val.df = data.frame( do.call(rbind, val.list) )
+ }
+ colnames(val.df) = c('crs', 'actual','air')
+ output.key = key
+ output.val = c( nrow(val.df), mean(val.df$crs, na.rm=T),
+ mean(val.df$actual, na.rm=T),
+ mean(val.df$air, na.rm=T) )
+ return( keyval(output.key, output.val) )
+ }
> mr.year.market.enroute_time = function (input, output) {
+ mapreduce(input = input,
+ output = output,
+ input.format = asa.csvtextinputformat,
+ output.format='csv', # note to self: 'csv' for data, 'text' for bug
+ map = mapper.year.market.enroute_time,
+ reduce = reducer.year.market.enroute_time,
+ backend.parameters = list(
+ hadoop = list(D = "mapred.reduce.tasks=2")
+ ),
+ verbose=T)
+ }
> out = mr.year.market.enroute_time(hdfs.data, hdfs.out)
Error in file(f, if (format$mode == "text") "r" else "rb") :
cannot open the connection
In addition: Warning message:
In file(f, if (format$mode == "text") "r" else "rb") :
cannot open file 'data/local/airline/20040325-jfk-lax.csv': No such file or directory
> if (LOCAL)
+ {
+ results.df = as.data.frame( from.dfs(out, structured=T) )
+ colnames(results.df) = c('year', 'market', 'flights', 'scheduled', 'actual', 'in.air')
+ print(head(results.df))
+ }
Error in to.dfs.path(input) : object 'out' not found
太感谢了!