4

我们正在尝试将 ES(1.7.2,4 节点集群)与 Spark(1.5.1,使用 hive 编译,hadoop 与 scala 2.11,4 节点集群)集成,有 hdfs 进入方程(hadoop 2.7,4 节点)和thrift jdbc 服务器和 elasticsearch-hadoop-2.2.0-m1.jar

因此,在 ES 上执行语句有两种方式。

  1. Spark SQL 与 Scala

    val conf = new  SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
    conf.set("spark.logConf", "true")
    conf.set("spark.cores.max","20")
    conf.set("es.index.auto.create", "false")
    conf.set("es.batch.size.bytes", "100mb")
    conf.set("es.batch.size.entries", "10000")
    conf.set("es.scroll.size", "10000")
    conf.set("es.nodes", "node2:39200")
    conf.set("es.nodes.discovery","true")
    conf.set("pushdown", "true")
    
    sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
    sc.addJar("executorLib/scala-library-2.10.1.jar")
    
    sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )
    
    val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")
    .....
    
  2. Thrift 服务器(在 spark 上执行的代码)

    ....
    polledDataSource = new ComboPooledDataSource()
    polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
    polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
    polledDataSource.setMaxPoolSize(5)
    dbConnection = polledDataSource.getConnection
    dbStatement = dbConnection.createStatement
    
    val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")
    
    dbStatement.setFetchSize(50000)
    dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")
    .....
    

我有以下问题,由于它们是连接的,我决定将它们打包成一个问题:

  1. 似乎使用 Spark SQL 的方法支持 WHERE 后面的下推(是否指定​​了 es.query),执行时间相同并且可以接受。但是解决方案 1 绝对不支持聚合函数的 pushdow,即呈现的 count(*) 不在 ES 一侧执行,而是在检索到所有数据之后 - ES 返回行并且 Spark SQL 对它们进行计数。请确认这是否是正确的行为

  2. 第一个解决方案的行为很奇怪,无论下推是真还是假,时间都是相等的

  3. 解决方案 2 似乎不支持下推,我尝试以什么方式指定子查询并不重要,无论是表定义的一部分还是语句的 WHERE 子句,似乎只是获取所有巨大的索引然后对其进行数学计算。是不是 Thrift-hive 不能对 ES 进行下推

  4. 我想在弹性搜索中跟踪查询,我确实设置了以下内容:

    //logging.yml
    index.search.slowlog: TRACE, index_search_slow_log_file
    index.indexing.slowlog: TRACE, index_indexing_slow_log_file
    
    additivity:
      index.search.slowlog: true
      index.indexing.slowlog: true
    

所有 index.search.slowlog.threshold.query、index.search.slowlog.threshold.fetch 甚至 index.indexing.slowlog.threshold.index 都设置为 0ms。而且我确实在慢日志文件中看到了从感觉执行的常见语句(所以它可以工作)。但我没有看到针对 ES 执行的 Spark SQL 或 thrift 语句。我想这些是 scan&scroll 语句,因为如果我从感觉执行 scan&scroll,这些也不会被记录。是否有可能以某种方式在 ES 一侧跟踪扫描和滚动?

4

2 回答 2

2
  1. 据我所知,这是一种预期的行为。我所知道的所有来源的行为方式都完全相同,而且直觉上它是有道理的。SparkSQL 专为分析查询而设计,在本地获取数据、缓存和处理更有意义。另请参阅spark 谓词下推是否适用于 JDBC?

  2. 我认为这根本没有conf.set("pushdown", "true")任何影响。如果要配置特定于连接的设置,则应像OPTION第二种情况一样将其作为映射传递。使用es前缀应该也可以

  3. 这确实很奇怪。Martin Senne报告了 PostgreSQL 的类似问题,但我无法重现。

于 2015-11-13T08:26:15.107 回答
1

我在 elasticsearch 讨论组与 Costin Leau 进行了讨论后,他指出了以下内容,我应该与您分享:

您的设置存在许多问题:

  1. 您提到使用 Scala 2.11,但使用的是 Scala 2.10。请注意,如果您想选择您的 Scala 版本,elasticsearch-spark应该使用,elasticsearch-hadoop仅提供 Scala 2.10 的二进制文件。

  2. 下推功能只能通过 Spark 数据源获得。如果您不使用这种类型的声明,pushdown则不会传递给 ES(这就是 Spark 的工作方式)。因此,声明pushdown存在无关紧要。

  3. 请注意,ES-Hadoop 中的所有参数是如何开始的- 唯一的es.例外是Spark DataSource 特定的(遵循 Spark 约定,因为这些是专用 DS 中的 Spark 特定功能)pushdownlocation

  4. 使用临时表确实算作数据源,但您需要在pushdown那里使用。如果你不这样做,它会默认激活,因此你看不到你的运行之间的区别;你没有改变任何相关的参数。

  5. 计数和其他聚合不会被 Spark 下推。根据 Databricks 团队的说法,未来可能会有一些东西,但目前还没有。对于计数,您可以使用 进行快速调用dataFrame.rdd.esCount。但这是一个特例。

  6. 我不确定 Thrift 服务器是否真的算作 DataSource,因为它从 Hive 加载数据。您可以通过启用将org.elasticsearch.hadoop.spark包登录到 DEBUG 来仔细检查这一点。您应该查看 SQL 是否确实被转换为 DSL。

我希望这有帮助!

于 2015-11-26T13:47:17.143 回答