我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:
- 文档已经用 JSON 序列化,应该按原样编写
_id
应提供Elasticsearch 文档
这是我到目前为止所尝试的。
saveJsonToEs()
我尝试这样使用saveJsonToEs()
(序列化文档包含_id
具有所需 Elasticsearch ID 的字段):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
但是elasticsearch-hadoop
图书馆给出了这个例外:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
如果我删除es.mapping.exclude
但保留es.mapping.id
并发送带有_id
内部的 JSON(如{"_id":"blah",...}
)
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
我收到此错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
当我尝试将此 id 作为不同的字段发送时(例如{"superID":"blah",..."
:
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "superID")
)
EsSpark.saveJsonToEs(rdd, cfg)
它无法提取该字段:
17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
当我从配置中删除es.mapping.id
并es.mapping.exclude
从配置中删除时,它可以工作,但文档 ID 是由 Elasticsearch 生成的(这违反了要求 2):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
saveToEsWithMeta()
还有另一个功能要提供_id
和其他用于插入的元数据saveToEsWithMeta()
:允许解决需求 2 但因需求 1 而失败。
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
事实上,Elasticsearch 甚至无法解析elasticsearch-hadoop
发送的内容:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
问题
是否可以将 Spark 的集合写入(documentID, serializedDocument)
Elasticsearch(使用elasticsearch-hadoop
)?
PS 我正在使用 Elasticsearch 5.6.3 和 Spark 2.1.1。