2

I've been using bdutil for a year now, with hadoop and spark and this is quite perfect! Now I've got a little problem trying to get SparkR to work with Google Storage as HDFS.

Here is my setup : - bdutil 1.2.1 - I have deployed a cluster with 1 master and 1 worker with Spark 1.3.0 installed - Installed R and SparkR on both master and worker

When I run SparkR on master node, I'm trying to point a directory on my GS bucket serveral ways:

1) By setting the gs Filesystem scheme

> file <- textFile(sc, "gs://xxxxx/dir/")
> count(file)
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library is available
15/05/27 12:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library not loaded
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No FileSystem for scheme: gs
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        ... 25 more
Error: returnStatus == 0 is not TRUE

2) With a HDFS URL

> file <- textFile(sc, "hdfs://hadoop-stage-m:8020/dir/")
> count(file)
collect on 10 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop-stage-m:8020/dir
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        ... 25 more
Error: returnStatus == 0 is not TRUE

3) With a path as I would use with Scala on my other Spark jobs : quite the same error as 2)

I'm sure I'm missing an obvious step. If there is anyone who can help me on this matter, it would be great!

Thanks,

PS: I'm 100% sure that gcs connector is working on a classic Scala job!

4

1 回答 1

2

简答

您需要 core-site.xml、hdfs-site.xml 等,以及类路径中的 gcs-connector-1.3.3-hadoop1.jar。通过以下方式完成此操作:

export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR

您可能还需要其他spark-env.sh设置;考虑另外运行:

source /home/hadoop/spark-install/conf/spark-env.sh

之前./sparkR。如果您在 R 中手动调用 sparkR.init,那么这不是必需的,因为您将master直接传递参数。

其他可能的陷阱:

  1. 确保您的默认 Java 是 Java 7。如果是 Java 6,请运行sudo update-alternatives --config java并选择默认 Java 7。
  2. 构建 sparkR 时确保设置 Spark 版本:SPARK_VERSION=1.3.0 ./install-dev.sh

长答案

通常,“No FileSystem for scheme”错误意味着我们需要确保 core-site.xml 在类路径中;修复类路径后我遇到的第二个错误是“java.lang.ClassNotFoundException:com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem”,这意味着我们还需要将 gcs-connector-1.3.3.jar 添加到类路径。查看 SparkR 帮助程序脚本,主要的sparkR二进制调用sparkR.init如下:

  sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))

环境变量MASTER通常在脚本中找到spark-env.sh,并且确实bdutil填充了. 通常,这应该表明简单的添加应该足以填充 SparkR 的必要设置,但是如果我们查看定义,我们会看到:MASTER/home/hadoop/spark-install/conf/spark-env.shsource /home/hadoop/spark-install/conf/spark-env.shsparkR

#' Initialize a new Spark Context.
#'
#' This function initializes a new SparkContext.
#'
#' @param master The Spark master URL.
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkRBackendPort The port to use for SparkR JVM Backend.
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#'                  list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#'                  list(spark.executor.memory="1g"),
#'                  list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#'                  c("jarfile1.jar","jarfile2.jar"))
#'}

sparkR.init <- function(
  master = "",
  appName = "SparkR",
  sparkHome = Sys.getenv("SPARK_HOME"),
  sparkEnvir = list(),
  sparkExecutorEnv = list(),
  sparkJars = "",
  sparkRLibDir = "") {

  <...>
  cp <- paste0(jars, collapse = collapseChar)

  yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
  if (yarn_conf_dir != "") {
    cp <- paste(cp, yarn_conf_dir, sep = ":")
  }
  <...>

    if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
      launchBackend(classPath = cp,
                    mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
                    args = path,
                    javaOpts = paste("-Xmx", sparkMem, sep = ""))
    } else {
      # TODO: We should deprecate sparkJars and ask users to add it to the
      # command line (using --jars) which is picked up by SparkSubmit
      launchBackendSparkSubmit(
          mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
          args = path,
          appJar = .sparkREnv$assemblyJarPath,
          sparkHome = sparkHome,
          sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
    }

这告诉我们三件事:

  1. 默认sparkR脚本无法通过sparkJars,因此目前似乎没有一种方便的方式将 libjars 作为标志传递。
  2. 无论如何,有一个 TODO 可以弃用该sparkJars参数。
  3. 除了sparkJars参数之外,进入cp/classPath参数的唯一另一件事是YARN_CONF_DIR(除非我缺少其他一些类路径添加源,或者我使用的 sparkR 版本与您不同)。YARN_CONF_DIR此外,幸运的是,即使您不打算在 YARN 上运行,它似乎也可以使用。

总而言之,这表明您可能至少需要变量,/home/hadoop/spark-install/conf/spark-env.sh因为至少有一些钩子似乎在寻找通常在那里定义的环境变量,其次我们应该能够YARN_CONF_DIR指定两个类路径以使其找到核心站点.xml 以及将 gcs-connector-1.3.3.jar 添加到类路径中。

所以,你的问题的答案是:

export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR

/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar如果您使用 hadoop2 或其他 gcs-connector 版本,您可能需要更改该部分。该命令修复了 HDFS 访问以及查找fs.gs.implgcs-connector 以及确保实际 gcs-connector jar 在类路径上。它不会拉入,spark-env.sh因此您可能会发现它默认为使用MASTER=local. 假设您的工作节点也正确安装了 SparkR,您可以考虑运行以下命令:

source /home/hadoop/spark-install/conf/spark-env.sh
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR

根据我遇到的一些额外的警告:

  1. 您可能会发现您的 R 安装设置了较旧的 Java 版本。如果您遇到类似“不支持的major.minor 51.0 版”之类的问题,请运行sudo update-alternatives --config javaJava 7 并将其设为默认值。
  2. 如果您使用的是 Spark 1.3.0,如果您使用的是 SparkR install-dev.sh,Spark 可能会错误地挂起并显示“初始作业未接受任何资源;检查您的集群 UI 以确保工作人员已注册并有足够的内存”,而实际上是调度程序因 serialVersionUID 不匹配而快速失败,您可以在 /hadoop/spark/logs/*Master*.out 中看到 - 解决方案是确保使用正确的 Spark 版本集运行 install-dev.sh:SPARK_VERSION=1.3.0 ./install-dev.sh
于 2015-05-29T00:03:43.790 回答