1

我的设置是在 AWS 中运行的 3 节点集群。我已经摄取了我的数据(30 万行)并且在使用 jupyter notebook 运行查询时没有问题。但现在我正在尝试使用 spark 和 java 运行查询,如下面的代码片段所示。

public class SparkSqlTest {

    private static final Logger log = Logger.getLogger(SparkSqlTest.class);


    public static void main(String[] args) {
        Map<String, String> dsParams = new HashMap<>();
        dsParams.put("instanceId", "gis");
        dsParams.put("zookeepers", "server ip");
        dsParams.put("user", "root");
        dsParams.put("password", "secret");
        dsParams.put("tableName", "posiciones");

        try {
            DataStoreFinder.getDataStore(dsParams);
            SparkConf conf = new SparkConf();
            conf.setAppName("testSpark");
            conf.setMaster("yarn");
            SparkContext sc = SparkContext.getOrCreate(conf);
            SparkSession ss = SparkSession.builder().config(conf).getOrCreate();

            Dataset<Row> df = ss.read()
                .format("geomesa")
                .options(dsParams)
                .option("geomesa.feature", "posicion")
                .load();
            df.createOrReplaceTempView("posiciones");

            long t1 = System.currentTimeMillis();
            Dataset<Row> rows = ss.sql("select count(*) from posiciones where id_equipo = 148 and fecha_hora >= '2015-04-01' and fecha_hora <= '2015-04-30'");
            long t2 = System.currentTimeMillis();
            rows.show();

            log.info("Tiempo de la consulta: " + ((t2 - t1) / 1000) + " segundos.");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

我将代码上传到我的主 EC2 框中(在jupyter 笔记本图像内),并使用以下命令运行它:

docker cp myjar-0.1.0.jar jupyter:myjar-0.1.0.jar
docker exec jupyter sh -c '$SPARK_HOME/bin/spark-submit --master yarn --class mypackage.SparkSqlTest file:///myjar-0.1.0.jar --jars $GEOMESA_SPARK_JARS'

但我收到以下错误:

17/09/15 19:45:01 INFO HSQLDB4AD417742A.ENGINE: dataFileCache open start
17/09/15 19:45:02 INFO execution.SparkSqlParser: Parsing command: posiciones
17/09/15 19:45:02 INFO execution.SparkSqlParser: Parsing command: select count(*) from posiciones where id_equipo = 148 and fecha_hora >= '2015-04-01' and fecha_hora <= '2015-04-30'
java.lang.RuntimeException: Could not find a SpatialRDDProvider
at org.locationtech.geomesa.spark.GeoMesaSpark$$anonfun$apply$2.apply(GeoMesaSpark.scala:33)
at org.locationtech.geomesa.spark.GeoMesaSpark$$anonfun$apply$2.apply(GeoMesaSpark.scala:33)

任何想法为什么会发生这种情况?

4

1 回答 1

1

我终于理清了,我的问题是我的 pom.xml 中没有包含以下条目

    <dependency>
        <groupId>org.locationtech.geomesa</groupId>
        <artifactId>geomesa-accumulo-spark_2.11</artifactId>
        <version>${geomesa.version}</version>
    </dependency>

    <dependency>
        <groupId>org.locationtech.geomesa</groupId>
        <artifactId>geomesa-spark-converter_2.11</artifactId>
        <version>${geomesa.version}</version>
    </dependency>
于 2017-09-17T13:42:22.147 回答