0

我有一个类似于stackoverflow中的以下链接的问题

R+Hadoop:如何从 HDFS 读取 CSV 文件并执行 mapreduce?

我想从 HDFS 中的位置“/somnath/logreg_data/ds1.10.csv”读取文件,将其列数从 10 减少到 5,然后写入另一个位置“/somnath/logreg_data/reduced/ds1.10 .reduced.csv”在 HDFS 中使用下面的 transfer.csvfile.hdfs.to.hdfs.reduced函数。

transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)

函数定义为

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        #r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = from.dfs(hdfsFilePath),
                                          input.format = "native",
                                          map = function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                print(label)
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))}
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        #to.dfs(reduced.values)
                }

但我收到一个错误

Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted

或者

当我尝试使用以下命令从 hdfs 加载文件时,出现以下错误:

> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",  :
  attempt to apply non-function

任何帮助将不胜感激

谢谢

4

1 回答 1

1

基本上找到了我上面所说的问题的解决方案。

r.file <- hdfs.file(hdfsFilePath,"r")
from.dfs(
    mapreduce(
         input = as.matrix(hdfs.read.text.file(r.file)),
         input.format = "csv",
         map = ...
))

下面是整个修改后的函数:

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        hdfs.init()
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                                                function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = as.matrix(hdfs.read.text.file(r.file)),
                                          input.format = "csv",
                                          map = function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                       function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M)) }
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        hdfs.close(r.file)
                        hdfs.close(w.file)
                        #to.dfs(reduced.values)
                }

希望这会有所帮助,如果您觉得有用,请不要忘记给分。提前谢谢

于 2014-07-25T13:17:34.203 回答