我正在尝试使用Amazon Elastic Map Reduce运行数百万个案例的一系列模拟。这是一个没有减速器的 Rscript 流作业。我在 EMR 通话中使用 Identity Reducer --reducer org.apache.hadoop.mapred.lib.IdentityReducer
。
当手动传递一行字符串时,脚本文件在测试和本地运行时运行良好echo "1,2443,2442,1,5" | ./mapper.R
,我得到了我期望的一行结果。但是,当我使用 EMR 上的输入文件中的大约 10,000 个案例(行)测试我的模拟时,我在 10k 输入行中只得到了十几行左右的输出。我已经尝试了几次,但我不知道为什么。Hadoop 作业运行良好,没有任何错误。似乎正在跳过输入行,或者 Identity reducer 可能发生了一些事情。对于有输出的情况,结果是正确的。
我的输入文件是具有以下数据格式的 csv,由逗号分隔的五个整数组成:
1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...
这是我的mapper.R的 R 脚本
#! /usr/bin/env Rscript
# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
list <- lapply(casename, function(x) {
read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
header = TRUE,
stringsAsFactors = FALSE)})
return(data.frame(list))
}
con <- file("stdin")
line <- readLines(con, n = 1, warn = FALSE)
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases) # for indexing
## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")
## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))
## Calculations that use df d and produce a string called 'output'
## in the form of "id: value1 value2 value3 ..." to be used at a
## later time for agregation.
cat(output, "\n")
close(con)
此模拟的(广义)EMR 调用是:
ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2
如果有人对我为什么会遇到这些问题有任何见解,我愿意接受建议,以及对 R 脚本的任何更改/优化。
我的另一个选择是将脚本变成一个函数并使用 R 多核包运行并行化应用,但我还没有尝试过。我想让这个在 EMR 上工作。我使用JD Long和