2

我正在尝试运行这个小的 spark-xml 示例,当我执行 spark-submit 时它失败并出现异常。

示例仓库:https ://github.com/punithmailme/spark-xml-new

命令:./dse spark-submit --class MainDriver /Users/praj3/Desktop/projects/spark/main/build/libs/main.jar

import java.io.Serializable;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Builder;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class MainDriver {

  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("Unit Test");
    sparkConf.setMaster("local[2]");

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    SQLContext sqlCon = new SQLContext(javaSparkContext);

    final JavaRDD<Book> parallelize = javaSparkContext
        .parallelize(Arrays.asList(Book.builder().name("2341").test("34").build(),
            Book.builder().name("2341").test("34").build()));

    final JavaRDD<Row> map = parallelize.map(book -> RowFactory.create(
        book.getName(),
        book.getTest()
    ));

    List<StructField> fields = new ArrayList<>();
    fields.add(new StructField("Name", DataTypes.StringType, true, Metadata.empty()));
    fields.add(new StructField("Test", DataTypes.StringType, true, Metadata.empty()));
    final StructType structType = DataTypes.createStructType(fields);

    final Dataset<Row> dataFrame = sqlCon.createDataFrame(map, structType);

    dataFrame
        .repartition(1)
        .write()
        .format("com.databricks.spark.xml")
        .mode(SaveMode.Overwrite)
        .option("rootTag", "n:Brands")
        .option("rowTag", "n:Brand")
        .save("new");

  }



}


@Data
@Builder
class Book implements Serializable {

  private final String name;
  private final String test;
}

例外..

Caused by: javax.xml.stream.XMLStreamException: Trying to output second root, <n:Brand>
    at com.ctc.wstx.sw.BaseStreamWriter.throwOutputError(BaseStreamWriter.java:1537) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
    at com.ctc.wstx.sw.BaseStreamWriter.throwOutputError(BaseStreamWriter.java:1544) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
    at com.ctc.wstx.sw.BaseStreamWriter.reportNwfStructure(BaseStreamWriter.java:1572) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
    at com.ctc.wstx.sw.BaseNsStreamWriter.checkStartElement(BaseNsStreamWriter.java:469) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
    at com.ctc.wstx.sw.BaseNsStreamWriter.writeStartElement(BaseNsStreamWriter.java:290) ~[woodstox-core-asl-4.4.1.jar:4.4.1]
    at com.sun.xml.internal.txw2.output.DelegatingXMLStreamWriter.writeStartElement(DelegatingXMLStreamWriter.java:45) ~[na:1.8.0_144]
    at com.sun.xml.internal.txw2.output.IndentingXMLStreamWriter.writeStartElement(IndentingXMLStreamWriter.java:148) ~[na:1.8.0_144]
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:128) ~[main.jar:na]
    at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:108) ~[main.jar:na]
    at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:96) ~[main.jar:na]
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ~[scala-library-2.11.11.jar:na]

Mac 上的环境和依赖 DataStax Enterprise 5.1.8 具有以下依赖项

[group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.1.2'],
            [group: 'org.projectlombok', name: 'lombok', version: lombokVersion],
            [group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.7'],
            [group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.5'],
            [group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.5'],
            [group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.5'],
    )

    group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.1.2') {
        exclude group: 'org.slf4j', module: 'slf4j-log4j12' //because of log4j and slf conflict
    }


            [group: 'com.databricks', name: 'spark-csv_2.11', version: '1.5.0'],
            [group: 'com.databricks', name: 'spark-xml_2.11', version: '0.4.1']
    )

DSE 5.1.8 组件

  • 阿帕奇卡桑德拉™ 3.11.1.2261
  • Apache Solr™ 6.0.1.0.2224
  • Apache Spark™ 2.0.2.17
  • DSE Java 驱动程序 1.2.6
  • 星火作业服务器 0.6.2.237

当我将其作为主方法作为单线程运行时,它可以工作,只有在 spark-submit 上它不起作用!!!

4

1 回答 1

0

我已经在 Yarn Cluster 上尝试了示例代码,并且在带有 S3 的 AWS EMR 上运行良好。

https://github.com/mkanchwala/spark-databricks-example

请试一试,让我知道。

于 2018-07-17T13:15:39.660 回答