1

我正在尝试使用 kafka connect 将数据从 mongodb 流式传输到 elasticsearch。

通过 mongodb 连接器流入 kafka 的数据如下所示

{
   "updatedAt" : {
      "$date" : 1591596275939
   },
   "createdAt" : {
      "$date" : 1362162600000
   },
   "name" : "my name",
   "_id" : {
      "$oid" : "5ee0cc7e0c3273f3d4a3c20f"
   },
   "documentId" : "mydoc1",
   "age" : 20,
   "language" : "English",
   "validFrom" : {
      "$date" : 978307200000
   },
   "remarks" : [
      "remarks"
   ],
   "married" : false
}

将数据保存到elasticsearch时我有以下两个问题

  1. _id 是一个对象,我想在 elasticsearch 中使用“documentId”键作为 _id
  2. 日期是一个带有 $date 键的对象,我不知道如何转换为正常日期。

任何人都可以请我就上述两个问题指出正确的方向。

Mongodb源配置

{
   "tasks.max" : "5",
   "change.stream.full.document" : "updateLookup",
   "name" : "mongodb-source",
   "value.converter" : "org.apache.kafka.connect.storage.StringConverter",
   "collection" : "collection",
   "poll.max.batch.size" : "1000",
   "connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
   "batch.size" : "1000",
   "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
   "key.converter.schemas.enable":"false",
   "value.converter.schemas.enable":"false",
   "connection.uri" : "mongodb://connection",
   "publish.full.document.only" : "true",
   "database" : "databasename",
   "poll.await.time.ms" : "5000",
   "topic.prefix" : "mongodb"
}

弹性水槽配置

{
   "write.method" : "upsert",
   "errors.deadletterqueue.context.headers.enable" : "true",
   "name" : "elasticsearch-sink",
   "connection.password" : "password",
   "topic.index.map" : "mongodb.databasename.collection:elasticindexname",
   "connection.url" : "http://localhost:9200",
   "errors.log.enable" : "true",
   "flush.timeout.ms" : "20000",
   "errors.log.include.messages" : "true",
   "key.ignore" : "false",
   "type.name" : "_doc",
   "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
   "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable":"false",
   "value.converter.schemas.enable":"false",
   "tasks.max" : "1",
   "batch.size" : "100",
   "schema.ignore" : "true",
   "schema.enable" : "false",
   "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
   "read.timeout.ms" : "6000",
   "connection.username" : "elastic",
   "topics" : "mongodb.databasename.collection",
   "proxy.host": "localhost",
   "proxy.port": "8080"
}

例外

Caused by: org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id.
        at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:107)
        at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:291)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:276)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:174)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        ... 10 more

连接器链接:

https://docs.mongodb.com/kafka-connector/master/kafka-source/
https://docs.confluent.io/current/connect/kafka-connect-elasticsearch
4

0 回答 0