0

我有 Kafka 在融合云上运行,我可以使用 Node.js 客户端生成数据,数据作为字符串发送,并且我在融合云中获得以下字段。在此处输入图像描述

然后,我创建了一个ElasticsearchSink Connector并将其连接到弹性搜索云。如果我没有在弹性搜索中创建任何映射,则数据传输成功,但格式是这样的。

"_source" : {
          "booked" : false,
          "phone_number" : "919191919191",
          "location" : {
            "lon" : 60.23,
            "lat" : 78.233
          }
        }

现在的问题是如果我想运行任何geo queries它不会允许我并给出以下错误:

"root_cause" : [
      {
        "type" : "query_shard_exception",
        "reason" : "failed to find geo_point field [location]",
        "index_uuid" : "C8Xxu9QlTMKN4Lk1LjpOmQ",
        "index" : "locations"
      }

原因是动态映射不支持 geo_field。因此,现在当我尝试在创建索引时为弹性搜索创建自定义映射时,如下所示:

PUT /locations
{
  "mappings": {
    "properties": {
      "phone_number": {
        "type": "text"
      },
      "booked": {
        "type": "boolean"
      },
      "location": {
        "type": "geo_point"
        }
      }
  }
}

然后融合连接器失败并显示以下错误:

There is a mapping collision in your index: Can't merge a non object mapping with an object mapping.

我也尝试过booked作为一个text领域,但事情似乎并不奏效。我没有在 Confluent 云上强制执行任何架构。这是来自融合云的一些基本配置。 在此处输入图像描述

如何强制映射以便我可以geo queries在 Elastic Search 中运行?

更新:这个问题仍然存在主要是因为发送到 Kafka 的数据格式

{
    "phone_number": "919191919191",
    "location": {
            "lat": 78.233,
            "lon": 60.23
    },
    "booked": false,
}

{
    "phone_number": "+919191919190",
    "location": " 78.233, 60.23",
    "booked": false,
}

两种格式都无法映射到上面定义的映射ElasticSearchconnector sink显示以下错误:

Received Illegal Argument Exception from Elasticsearch: One of your fields' type does not match the mapped type in Elasticsearch
4

1 回答 1

0

Confluent Cloud 在找出与模式相关的事情并将该发现存储在其内置的模式注册表中方面做了一项偷偷摸摸的工作。恐怕您的映射不起作用,因为:

  1. 连接器仍在尝试发送以前存储的数据。
  2. 以前存储的数据仍附加到旧模式。
  3. Confluent Cloud 没有意识到架构已经发展。

尝试通过在 Confluent Cloud 中创建一个新环境来重置您的设置(这将强制创建一个新的 SR 实例),或者可能使用一个全新的 Kafka 主题。无论哪种方式,都从新数据开始。连接器总是试图保持乐观并确保没有数据丢失,但在此过程中这可能是错误的,因为模式已经发展。

先在 Elasticsearch 上设置映射。完成此操作后,连接器将映射到正确的架构。此外,由于某种原因,它仅在我在 Elasticsearch 索引的映射上使用动态模板时才有效。

于 2020-10-26T13:40:42.090 回答