我有一个应用程序,我在其中读取 csv 文件并进行一些转换,然后将它们从 spark 本身推送到弹性搜索。像这样
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + type).save()
我有几个节点,在每个节点中,我运行 5-6 个spark-submit
推送到elasticsearch
我经常收到错误
Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]
我的 Elasticsearch 集群具有以下统计信息 -
Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node
我已经修改了 elasticseach 的以下参数
spark.es.batch.size.bytes=5000000
spark.es.batch.size.entries=5000
spark.es.batch.write.refresh=false
任何人都可以建议,我可以解决什么来摆脱这些错误?