1

v6.2使用规定的 spark 连接器从 Elasticsearch 读取到 sparkorg.elasticsearch:elasticsearch-spark-20_2.11:6.3.2非常慢。这是来自具有索引的 3 节点 ES 集群:

curl https://server/_cat/indices?v
green  open   db MmVwAwYfTz4eE_L-tncbwQ   5   1  199983131      9974871    105.1gb         51.8gb

在(10 个节点,1tb 内存,>50 个 VCPU)spark 集群上读取:

val query = """{
  "query": {
    "match_all": {}
  }
}"""

val df = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes","server")
  .option("es.port", "443")
  .option("es.net.ssl","true")
  .option("es.nodes.wan.only","true")
  .option("es.input.use.sliced.partitions", "false")
  .option("es.scroll.size", "1000")
  .option("es.read.field.include", "f1,f2,f3")
  .option("es.query",query)
  .load("db")

df.take(1)

执行需要 10 分钟。 在此处输入图像描述

这是(缓慢地)它应该如何工作,还是我做错了什么?

4

1 回答 1

1

这不是它应该有多慢,答案可以在您分享的屏幕截图中找到:

Spark UI 中的列Stages: Succeeded/Total只显示了一个运行读取操作的任务,我认为这不是您所期望的,否则拥有整个集群的意义何在。

我遇到了同样的问题,我花了一段时间才弄清楚 Spark 将一个任务(分区)与 Elasticsearch 索引中的每个分片相关联,

那里我们有我们的答案,为了更快,我们应该并行化这个过程,怎么做?通过将我们的源索引分配到多个分片中。

默认情况下,Elasticsearch 创建一个带有一个分片的索引,但是,可以将其个性化,如下所示:

PUT /index-name
{
     "settings": {
     "index": {
     "number_of_shards": x,  
     "number_of_replicas": xx 
    }
  }
}

分片的数量可能高于 Elastic 节点的数量,这对 Spark 来说都是透明的。如果索引已经存在,请尝试创建一个新的 inex,然后使用 Elasticsearch Reindex API

我希望这能解决你的问题。

于 2021-05-16T11:01:05.167 回答