1

我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:

  1. 文档已经用 JSON 序列化,应该按原样编写
  2. _id应提供Elasticsearch 文档

这是我到目前为止所尝试的。

saveJsonToEs()

我尝试这样使用saveJsonToEs()(序列化文档包含_id具有所需 Elasticsearch ID 的字段):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

但是elasticsearch-hadoop图书馆给出了这个例外:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)

如果我删除es.mapping.exclude但保留es.mapping.id并发送带有_id内部的 JSON(如{"_id":"blah",...}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

我收到此错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
...

当我尝试将此 id 作为不同的字段发送时(例如{"superID":"blah",..."

 val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "superID")
)

EsSpark.saveJsonToEs(rdd, cfg)

它无法提取该字段:

17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

当我从配置中删除es.mapping.ides.mapping.exclude从配置中删除时,它可以工作,但文档 ID 是由 Elasticsearch 生成的(这违反了要求 2):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveJsonToEs(rdd, cfg)

saveToEsWithMeta()

还有另一个功能要提供_id和其他用于插入的元数据saveToEsWithMeta():允许解决需求 2 但因需求 1 而失败。

val rdd: RDD[(String, String)] = job.map{
  r => r._id -> r.toJson()
}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveToEsWithMeta(rdd, cfg)

事实上,Elasticsearch 甚至无法解析elasticsearch-hadoop发送的内容:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)

问题

是否可以将 Spark 的集合写入(documentID, serializedDocument)Elasticsearch(使用elasticsearch-hadoop)?

PS 我正在使用 Elasticsearch 5.6.3 和 Spark 2.1.1。

4

5 回答 5

4

最后我发现了问题:这是配置中的错字。

[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]

它正在寻找一个领域superID,但只有superID(注意案例)。在问题中它也有点误导,因为在代码中它看起来像"es.mapping.id", "superID"(这是不正确的)。

实际的解决方案就像Levi Ramsey建议的那样:

val json = """{"foo":"bar","superID":"deadbeef"}"""

val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
  ("es.mapping.id", "superID"),
  ("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)

不同之处在于es.mapping.id不能_id(如原始帖子_id中所述,元数据和 Elasticsearch 不接受它)。

自然这意味着superID应该将新字段添加到映射中(除非映射是动态的)。如果在索引中存储额外的字段是一种负担,还应该:

非常感谢Alex Savitsky指出了正确的方向。

于 2017-12-22T16:17:57.720 回答
2

您是否尝试过类似的方法:

val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
  ("es.mapping.id", "_id")
)
rdd.saveJsonToEs("myindex/mytype", cfg)

我已经针对 ES 1.7 进行了测试(使用 elasticsearch-hadoop(连接器版本 2.4.5))并且它有效。

于 2017-12-20T20:12:07.010 回答
1

这可以通过将ES_INPUT_JSON选项传递给cfg 参数 map 并返回一个元组来完成,该元组包含文档 id 作为第一个元素,并且在 JSON 中序列化的文档作为 map 函数的第二个元素。

我用"org.elasticsearch" %% "elasticsearch-spark-20" % "[6.0,7.0["Elasticsearch 6.4对其进行了测试

import org.elasticsearch.hadoop.cfg.ConfigurationOptions.{ES_INPUT_JSON, ES_NODES}
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

job
  .map{ r => (r._id, r.toJson()) }
  .saveToEsWithMeta(
    "myindex/mytype",
    Map(
      ES_NODES -> "https://localhost:9200",
      ES_INPUT_JSON -> true.toString
    )
  )
于 2019-03-13T16:26:35.160 回答
0
  1. 您可以使用saveToEs为了定义 customer_id 而不必保存 customer_id
  2. 注意 rdd 是RDD[Map]类型
val rdd:RDD[Map[String, Any]]=...
val cfg = Map(
  ("es.mapping.id", your_customer_id),
  ("es.mapping.exclude", your_customer_id)
)
EsSpark.saveToEs(rdd, your_es_index, cfg)
于 2020-07-20T09:10:48.330 回答
0

我花了几天的时间把头撞在墙上,试图弄清楚为什么saveToEsWithMeta当我使用这样的 ID 字符串时不起作用:

rdd.map(caseClassContainingJson =>
  (caseClassContainingJson._idWhichIsAString, caseClassContainingJson.jsonString)
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))

这将引发与 JSON 解析相关的错误,这会欺骗性地导致您认为问题出在您的 JSON 上,但随后您记录每个 JSON 并查看它们都是有效的。

事实证明,无论出于何种原因ES_INPUT_JSON -> true,元组的左侧(即 ID)也被解析为 JSON!

解决方案,JSON 对 ID 进行字符串化(将 ID 用额外的双引号括起来),以便将其解析为 JSON 工作:

rdd.map(caseClassContainingJson =>
  (
    Json.stringify(JsString(caseClassContainingJson._idWhichIsAString)), 
    caseClassContainingJson.jsonString
  )
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))
于 2019-07-22T15:58:17.647 回答