我有一个类似于stackoverflow中的以下链接的问题
R+Hadoop:如何从 HDFS 读取 CSV 文件并执行 mapreduce?
我想从 HDFS 中的位置“/somnath/logreg_data/ds1.10.csv”读取文件,将其列数从 10 减少到 5,然后写入另一个位置“/somnath/logreg_data/reduced/ds1.10 .reduced.csv”在 HDFS 中使用下面的
transfer.csvfile.hdfs.to.hdfs.reduced
函数。
transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)
函数定义为
transfer.csvfile.hdfs.to.hdfs.reduced =
function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
#local.df = data.frame()
#hdfs.get(hdfsFilePath, local.df)
#to.dfs(local.df)
#r.file <- hdfs.file(hdfsFilePath,"r")
transfer.reduced.map =
function(.,M) {
label <- M[,dim(M)[2]]
reduced.predictors <- M[,1:reducedCols]
reduced.M <- cbind(reduced.predictors, label)
keyval(
1,
as.numeric(reduced.M))
}
reduced.values =
values(
from.dfs(
mapreduce(
input = from.dfs(hdfsFilePath),
input.format = "native",
map = function(.,M) {
label <- M[,dim(M)[2]]
print(label)
reduced.predictors <- M[,1:reducedCols]
reduced.M <- cbind(reduced.predictors, label)
keyval(
1,
as.numeric(reduced.M))}
)))
write.table(reduced.values, file="/root/somnath/reduced.values.csv")
w.file <- hdfs.file(hdfsWritePath,"w")
hdfs.write(reduced.values,w.file)
#to.dfs(reduced.values)
}
但我收到一个错误
Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode == :
cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode == :
cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted
或者
当我尝试使用以下命令从 hdfs 加载文件时,出现以下错误:
> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", :
attempt to apply non-function
任何帮助将不胜感激
谢谢