1

SnappyData v.0-5

我的目标是运行 snappydata 驱动程序以连接到远程服务器中的 SnappyData。我写了一个 Junit 来做到这一点。但是,当我运行它时,我得到一个 SparkContext 被实例化的错误:

**java.lang.NoClassDefFoundError: org/eclipse/jetty/server/handler/GzipHandler**
    at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:235)
    at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:234)
    at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
    at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:499)
    at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:499)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:499)

我的 pom.xml 依赖项是:

 <dependency>
        <groupId>io.snappydata</groupId>
        <artifactId>snappy-core_2.10</artifactId>
        <version>0.5</version>
    </dependency>
    <dependency>
        <groupId>io.snappydata</groupId>
        <artifactId>snappy-cluster_2.10</artifactId>
        <version>0.5</version>
 <dependency>


@Test
public void testInsertDataFromCsv() throws Exception {

    SparkConf conf = new SparkConf();
    conf.setMaster("spark://snappy-lead-host:8090");
    conf.setAppName("MySparkApp");

    SparkContext sc = new SparkContext(conf);
    SnappyContext snappyContext = new SnappyContext(sc);

    String fileResource = "data.csv";

    DataFrame dataFrame = snappyContext.read()
            .format("com.databricks.spark.csv").option("header", "true")
            .option("inferSchema", "true").load(fileResource);

    JavaRDD<Row> row = dataFrame.javaRDD();
    System.out.println(row.toDebugString());

    dataFrame.write().insertInto("example_table_col");

}
4

1 回答 1

2

SnappyData 集群的一个主要特性是长时间运行的 Spark 执行器(与数据存储 JVM 本身相同)。该程序的意图似乎是连接到现有集群,但它会尝试启动一组新的执行器 JVM 进行处理,这正是 Spark 正常工作的方式。SnappyData 领导不支持该模式,因为它旨在重用现有数据节点以执行。

Spark 中的这种限制是因为集群中只能有一个驱动程序已经在 SnappyData 领导节点中运行,因此不能创建新的驱动程序(我们确实打算在未来的版本中消除这个限制)。因此,指向前导节点的 URL"spark://..."将不起作用。运行 Spark 作业需要这些可能的部署策略之一(除了使用 JDBC/ODBC 客户端直接提交 SQL)。

注意:对于嵌入式模式,两者snappy-clustersnappy-core依赖项都是必需的,而对于其他两种模式,只snappy-core应作为依赖项添加。

嵌入式模式执行:与 JDBC/ODBC 客户端一样,这里的执行发生在数据节点本身中。这需要通过在活动领导节点上运行的作业服务器提交作业。程序必须使用 REST API(提供的脚本或类似的东西用于自包含测试)来实现提交它SnappySQLJob/JavaSnappySQLJob。详情在这里:http ://snappydatainc.github.io/snappydata/jobs/snappy-job.sh

public Object runJavaJob(SnappyContext snappyContext, Config config) {
    String fileResource = "data.csv";

    DataFrame dataFrame = snappyContext.read()
            .format("com.databricks.spark.csv").option("header", "true")
            .option("inferSchema", "true").load(fileResource);

    dataFrame.write().insertInto("example_table_col");

    // for debugging
    JavaRDD<Row> row = dataFrame.javaRDD();
    return row.toDebugString();
    // return Boolean.TRUE;
}

public JSparkJobValidation isValidJob(SnappyContext snappyContext,
                                      Config config) {
    return new JSparkJobValid();
}

本地拆分模式:在此模式下,执行集群是 spark localmaster,因此与 snappydata 集群分开。这不会提供良好的性能,因为它必须为大多数查询从数据节点获取大量数据,但它应该最容易用于具有少量数据的功能测试。使用 master aslocal并设置snappydata.store.locators属性指向定位器(参见http://snappydatainc.github.io/snappydata/connectingToCluster/和之前的链接)

@Test
public void testInsertDataFromCsv() throws Exception {

    SparkConf conf = new SparkConf();
    conf.setMaster("local[*]");
    conf.setAppName("MySparkApp");
    // below property can also be fetched with
    // io.snappydata.Property.Locators().apply()
    conf.set("snappydata.store.locators", "snappy-locator-host:10334");

    SparkContext sc = new SparkContext(conf);
    SnappyContext snappyContext = new SnappyContext(sc);

    String fileResource = "data.csv";

    DataFrame dataFrame = snappyContext.read()
            .format("com.databricks.spark.csv").option("header", "true")
            .option("inferSchema", "true").load(fileResource);

    JavaRDD<Row> row = dataFrame.javaRDD();
    System.out.println(row.toDebugString());

    dataFrame.write().insertInto("example_table_col");
}

拆分模式执行:最后,执行集群可以是一个普通的 Spark/Yarn/Mesos 集群,它将与 snappydata 集群作为一个普通的数据存储进行通信。这就是其他产品的 spark 连接器的工作方式(例如 Cassandra 连接器,其中 Cassandra 与 Spark 集群分开)。它可以在与 snappydata 集群相同的节点上运行以获得最佳性能,并且 snappydata 将努力确保路由执行,以便仅获取或插入来自本地表数据的数据。start-all.sh使用snappydata 发行版本身或 Apache Spark 1.6.x(或Apache Spark 文档中的 Yarn/Mesos 集群)启动一个单独的 Spark 集群)。代码将与上面的本地拆分模式相同,master 指向 Spark/Yarn/Mesos 主控,而不是 snappydata 引导。有关详细信息,请参阅本地拆分模式中的链接。

于 2016-08-06T15:51:29.183 回答