在独立的火花中,我试图从数据框写入 Elasticsearch。虽然我可以让它工作,但我不知道如何写入格式为“index_name-{ts_col:{YYYY-mm-dd}}”的动态命名索引,其中“ts_col”是数据集中的日期时间字段。
我看过各种各样的帖子说这种类型的语法应该可以工作,但是当我尝试它时,我得到了底部包含的错误。它似乎在创建索引之前首先检查索引是否存在,但它将未格式化的索引名称传递给该索引名称,而不是动态创建的索引名称。我尝试使用 python elasticsearch 模块首先使用相同的语法创建索引,但它无法处理动态索引名称。
是否有任何可用的解决方案,或者我是否必须在 spark 中遍历我的数据集以查找表示的每个日期,创建我需要的索引,然后一次写入每个索引?我错过了一些明显的东西吗?Logstash 很容易做到这一点,我不明白为什么我不能让它在 Spark 中工作。
这是我正在使用的写入命令(也尝试了它的不同变体):
df.write.format("org.elasticsearch.spark.sql")
.option('es.index.auto.create', 'true')
.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
.option('es.mapping.id', 'es_id')
.save()
这是我正在使用的罐子:
elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar
这是我使用上面的 write 命令时遇到的错误:
错误 NetworkClient: 节点 [##.##.##.##:9200] 失败(无效的目标 URI HEAD@null/index_name-{ts_col:{YYYY.mm.dd}}/type_name);选择下一个节点 [##.##.##.##:9200]
...
...
Py4JJavaError:调用 o114.save 时出错。:org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:连接错误(检查网络和/或代理设置)-所有节点都失败;
如果我将覆盖设置为 True,我会得到:
Py4JJavaError:调用 o58.save 时出错。:org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java 的 org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) 没有这样的索引空:446)在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) 在 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) 在 org.elasticsearch.hadoop.rest。 ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org .elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:
如果我尝试使用 Elasticsearch python 客户端提前创建索引,我会得到:
RequestError: TransportError(400, u'invalid_index_name_exception', u'无效索引名 [index_name-{ts_col:YYYY.MM.dd}], 必须小写')