1

这几天我一直在谷歌上搜索。

大图:我正在尝试使用任何必要的方式处理来自 RDD[String] 的 XML 文件——scala-xml、spark-xml,或者滚动我自己的 XML 处理程序,这对于特定的 XML 源来说已经足够好了。我刚刚切换到 Scala 并且对它有点陌生,因为 Python 对于大规模的 Spark 程序变得非常低效,而且我还没有准备好向它投入更多的计算能力。

第一个障碍:我现在似乎无法执行基本的地图功能,并且仍然在 IntelliJ 中调试我的程序。我正在尝试这样做:

val rddOfXMLFiles = sc.wholeTextFiles(folderPathString) // Makes a Tuple2[String, String]
val rddOfXML = rddOfXMLFiles.map(x => x._2) // Gets the second item, the actual XML.

当我在 IntelliJ 中运行它时,我得到:

Exception in thread "main" org.apache.spark.SparkException:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.28.1):
java.lang.ClassNotFoundException: sparkxmltest$$anonfun$3

(这就是一行,但我不会让你向右滚动三页。)

显然,worker 没有得到匿名函数的编译代码。

我可以看到编译后的代码存在于:

C:\Users\xxxxx\IdeaProjects\sparkxmltest\target\scala-2.11\classes\sparkxmltest$$anonfun$3.class

当它构建 jar 时,我可以在 7-Zip 中打开它,看到 sparkxmltest$$anonfun$3.class 存在于 jar 的根目录中。

在获得大型集群之前,我正在使用部署在本地机器上的独立集群进行测试。我在系统的路径中添加了一个文件夹 C:\sparkjars,这当然是 Master 和 Worker 正在运行的同一系统。我在项目结构中添加了一个包含所有封装依赖项的 uber JAR,并将其设置为在每个项目构建上构建,然后将其放在该文件夹中。

我将此添加到核心程序中:

conf.set("spark.driver.extraClassPath", "file:///C:/sparkjars/")
conf.set("spark.executor.extraClassPath", "file:///C:/sparkjars/")

所以这是整个程序:(请原谅我进行了一些不必要的导入;这是测试一个更大程序的一小部分,在遇到这个问题之前我已经奇怪地走了很长一段路。)

import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext, SparkSession}
import com.databricks.spark.xml
import com.databricks.spark.xml.XmlReader
import org.apache.spark.rdd.RDD

import scala.io.Source
// import scala.util.matching.Regex
// import java.io.File
import java.nio.file

object sparkxmltest {
  var LastTimeStamp = System.nanoTime()

  def cheapoBenchmark() : Long = {
    val previousTimeStamp = LastTimeStamp
    LastTimeStamp = System.nanoTime()
    LastTimeStamp - previousTimeStamp
  }

  def safeArrayIndex(a: Array[String], i: Int): Option[String] = {
    try Some(a(i)) catch { case _: Throwable => None }
  }

  def getSecondElement(insies: Tuple2[String, String]) : String = {
    insies._2
  }

  def main(args: Array[String]): Unit = {
    val ipAddress = safeArrayIndex(args, 0).getOrElse("0.0.0.0")
    val folderPathString = safeArrayIndex(args, 1).getOrElse("file:///C:/sparkwarehouse")
    val conf = new SparkConf()
    conf.setAppName("Test XML")
    conf.setMaster(s"spark://$ipAddress:7077")
    conf.set("spark.sql.warehouse.dir", "file:///C:/sparkwarehouse/")
    conf.set("spark.driver.extraClassPath", "file:///C:/sparkjars/")
    conf.set("spark.executor.extraClassPath", "file:///C:/sparkjars/")
    val sc = new SparkContext(conf)
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val sqlCtx = spark.sqlContext

    val rddOfXMLFiles = sc.wholeTextFiles(folderPathString)
    val rddOfXML = rddOfXMLFiles.map(x => x._2)
    println("Input Text:")
    rddOfXML.collect().foreach(println)

    // val df = new XmlReader().xmlRdd(sqlCtx, rddOfXML)
    println("End")
  }
}

这是构建:

name := "sparkxmltest"

version := "1.0"

scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.0" % "compile"
libraryDependencies += "com.databricks" % "spark-xml_2.11" % "0.4.1"

现在,我不能成为第一个尝试使用匿名函数映射 RDD 的人。如何让工人有权访问匿名函数?

Spark-Submit 似乎确实解决了这个问题(最终),但我无法从 STDOUT 获得任何有用的东西。

(另一个烦人的细节:我无法在没有证书错误的情况下提交火花,直到我在 7-Zip 中打开生成的 JAR 并删除 META-INF\DUMMY.SF 。它给了我:

Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes

我花了几个小时从互联网帖子中推断出来。但我不知道如何让 SBT 自动停止包含该证书文件。)

当我提交火花时它确实有效。我想任何成功的运行都足够好。但是您可能会理解为什么这个工作流程会使原型设计和测试变得困难。

当我设置 master: local[*] 时它确实有效但我所做的不仅仅是让我的 Spark 脚本在单个本地机器上运行。就像 Spark-Submit 一样,我想这确实证明了项目中的依赖关系是可以的。(你不会相信我花了多长时间学习如何/我什至不得不检查我对 mavenrepository 的版本依赖关系......但就像我说的,我是一个 n00b。为了那些可能跟随的人:如果你'正在使用 Spark 独立集群,请确保集群中安装的版本与您在程序中使用的版本相匹配,您使用的 Scala 版本与构建 Spark 的版本相匹配,该特定版本的 Scala 是安装并在 Path 上,该 Spark-Core 版本与您正在使用的其他库的依赖项版本匹配,例如 Spark-XML、Spark-SQL 等。

长话短说,这一切都在一起工作。现在我不必要地特别挑剔。出于快速测试的目的,我如何确保我构建的项目在所有 Workers 的类路径上都可用,而不使用 Spark-Submit,即,从 Scala 程序本身内部进行?这可能是 SparkConf 中的一个选项。作为构建的一部分,我应该能够让 IntelliJ 将 uber-JAR 吐出到公共文件夹中。如果我可以从 Scala 程序中将它添加到工作人员的类路径中,那么这意味着我可以在 IntelliJ 中进行快速测试和调试。

如果我使用的是更大的集群,我认为有一个“jars”选项可以让 Master 将我刚刚构建的 JAR 分发给 Workers。但是当他们到达那里时,他们仍然需要继续使用类路径——我怎么知道路径应该是什么,基于 JAR 将存储在 Worker 上的位置?

那么谁能告诉我(以及其他所有尝试做同样事情的人)我如何有效地构建项目并以我随后可以在 IntelliJ 中调试的方式部署它?

4

0 回答 0