我们正在尝试将 ES(1.7.2,4 节点集群)与 Spark(1.5.1,使用 hive 编译,hadoop 与 scala 2.11,4 节点集群)集成,有 hdfs 进入方程(hadoop 2.7,4 节点)和thrift jdbc 服务器和 elasticsearch-hadoop-2.2.0-m1.jar
因此,在 ES 上执行语句有两种方式。
Spark SQL 与 Scala
val conf = new SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g") conf.set("spark.logConf", "true") conf.set("spark.cores.max","20") conf.set("es.index.auto.create", "false") conf.set("es.batch.size.bytes", "100mb") conf.set("es.batch.size.entries", "10000") conf.set("es.scroll.size", "10000") conf.set("es.nodes", "node2:39200") conf.set("es.nodes.discovery","true") conf.set("pushdown", "true") sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar") sc.addJar("executorLib/scala-library-2.10.1.jar") sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" ) val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'") .....
Thrift 服务器(在 spark 上执行的代码)
.... polledDataSource = new ComboPooledDataSource() polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver") polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001") polledDataSource.setMaxPoolSize(5) dbConnection = polledDataSource.getConnection dbStatement = dbConnection.createStatement val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')") dbStatement.setFetchSize(50000) dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6") .....
我有以下问题,由于它们是连接的,我决定将它们打包成一个问题:
似乎使用 Spark SQL 的方法支持 WHERE 后面的下推(是否指定了 es.query),执行时间相同并且可以接受。但是解决方案 1 绝对不支持聚合函数的 pushdow,即呈现的 count(*) 不在 ES 一侧执行,而是在检索到所有数据之后 - ES 返回行并且 Spark SQL 对它们进行计数。请确认这是否是正确的行为
第一个解决方案的行为很奇怪,无论下推是真还是假,时间都是相等的
解决方案 2 似乎不支持下推,我尝试以什么方式指定子查询并不重要,无论是表定义的一部分还是语句的 WHERE 子句,似乎只是获取所有巨大的索引然后对其进行数学计算。是不是 Thrift-hive 不能对 ES 进行下推
我想在弹性搜索中跟踪查询,我确实设置了以下内容:
//logging.yml index.search.slowlog: TRACE, index_search_slow_log_file index.indexing.slowlog: TRACE, index_indexing_slow_log_file additivity: index.search.slowlog: true index.indexing.slowlog: true
所有 index.search.slowlog.threshold.query、index.search.slowlog.threshold.fetch 甚至 index.indexing.slowlog.threshold.index 都设置为 0ms。而且我确实在慢日志文件中看到了从感觉执行的常见语句(所以它可以工作)。但我没有看到针对 ES 执行的 Spark SQL 或 thrift 语句。我想这些是 scan&scroll 语句,因为如果我从感觉执行 scan&scroll,这些也不会被记录。是否有可能以某种方式在 ES 一侧跟踪扫描和滚动?