2

我有一个创建火花上下文的菊石脚本:

#!/usr/local/bin/amm

import ammonite.ops._

import $ivy.`org.apache.spark:spark-core_2.11:2.0.1`

import org.apache.spark.{SparkConf, SparkContext}

@main
def main(): Unit = {
  val sc =  new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo"))
}

当我运行这个脚本时,它会抛出一个错误:

Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: org.apache.spark.SparkException: Error while locating file spark-version-info.properties
...
Caused by: java.lang.NullPointerException
    at java.util.Properties$LineReader.readLine(Properties.java:434)
    at java.util.Properties.load0(Properties.java:353)

该脚本不是从 spark 安装目录运行的,也不知道它或打包此版本信息的资源 - 它只知道 ivy 依赖项。所以也许问题是这个资源信息不在常春藤依赖项的类路径上。我见过其他火花“独立脚本”,所以我希望我能在这里做同样的事情。


我摸索了一下,试图了解发生了什么。我希望我可以在运行时以编程方式将一些构建信息破解到系统属性中。

异常的来源来自spark 库中的package.scala。相关的代码位是

val resourceStream = Thread.currentThread().getContextClassLoader.
  getResourceAsStream("spark-version-info.properties")

try {
  val unknownProp = "<unknown>"
  val props = new Properties()
  props.load(resourceStream) <--- causing a NPE?
  (
    props.getProperty("version", unknownProp),
    // Load some other properties
  )
} catch {
  case npe: NullPointerException =>
    throw new SparkException("Error while locating file spark-version-info.properties", npe)

似乎隐含的假设是,props.load如果在资源中找不到版本信息,NPE 将失败。(这对读者来说不是很清楚!)

NPE 本身看起来像是来自以下代码java.util.Properties.java

class LineReader {
    public LineReader(InputStream inStream) {
        this.inStream = inStream;
        inByteBuf = new byte[8192];
    }

    ...
    InputStream inStream;
    Reader reader;

    int readLine() throws IOException {
      ...
      inLimit = (inStream==null)?reader.read(inCharBuf)
                                :inStream.read(inByteBuf);

TheLineReader是用 null 构造的InputStream,该类在内部将其解释为表示 thereader是非 null 并且应该改为使用 - 但它也是null. (这种东西真的在标准库里吗?看起来很不安全……)


bin/spark-shell火花附带的看,它会-Dscala.usejavacp=true在启动时添加spark-submit。这是正确的方向吗?

谢谢你的帮助!

4

1 回答 1

1

以下似乎适用于 1.0.1 版本的 2.11,但不是实验性的。

可以在 Spark 2.2 上更好地实现

#!/usr/local/bin/amm

import ammonite.ops._
import $ivy.`org.apache.spark:spark-core_2.11:2.2.0` 
import $ivy.`org.apache.spark:spark-sql_2.11:2.2.0`
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._ 
import org.apache.spark.sql.SparkSession

@main
def main(): Unit = {
    val sc =  new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo"))
}

或更扩展的答案:

@main
def main(): Unit = {
    val spark = SparkSession.builder()
      .appName("testings")
      .master("local")
      .config("configuration key", "configuration value")
      .getOrCreate 
  val sqlContext = spark.sqlContext
  val tdf2 = spark.read.option("delimiter", "|").option("header", true).csv("./tst.dat")
  tdf2.show()
}
于 2017-08-07T21:40:05.503 回答