我正在尝试使用 kafka connect 将数据从 mongodb 流式传输到 elasticsearch。
通过 mongodb 连接器流入 kafka 的数据如下所示
{
"updatedAt" : {
"$date" : 1591596275939
},
"createdAt" : {
"$date" : 1362162600000
},
"name" : "my name",
"_id" : {
"$oid" : "5ee0cc7e0c3273f3d4a3c20f"
},
"documentId" : "mydoc1",
"age" : 20,
"language" : "English",
"validFrom" : {
"$date" : 978307200000
},
"remarks" : [
"remarks"
],
"married" : false
}
将数据保存到elasticsearch时我有以下两个问题
- _id 是一个对象,我想在 elasticsearch 中使用“documentId”键作为 _id
- 日期是一个带有 $date 键的对象,我不知道如何转换为正常日期。
任何人都可以请我就上述两个问题指出正确的方向。
Mongodb源配置
{
"tasks.max" : "5",
"change.stream.full.document" : "updateLookup",
"name" : "mongodb-source",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"collection" : "collection",
"poll.max.batch.size" : "1000",
"connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
"batch.size" : "1000",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"connection.uri" : "mongodb://connection",
"publish.full.document.only" : "true",
"database" : "databasename",
"poll.await.time.ms" : "5000",
"topic.prefix" : "mongodb"
}
弹性水槽配置
{
"write.method" : "upsert",
"errors.deadletterqueue.context.headers.enable" : "true",
"name" : "elasticsearch-sink",
"connection.password" : "password",
"topic.index.map" : "mongodb.databasename.collection:elasticindexname",
"connection.url" : "http://localhost:9200",
"errors.log.enable" : "true",
"flush.timeout.ms" : "20000",
"errors.log.include.messages" : "true",
"key.ignore" : "false",
"type.name" : "_doc",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"tasks.max" : "1",
"batch.size" : "100",
"schema.ignore" : "true",
"schema.enable" : "false",
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"read.timeout.ms" : "6000",
"connection.username" : "elastic",
"topics" : "mongodb.databasename.collection",
"proxy.host": "localhost",
"proxy.port": "8080"
}
例外
Caused by: org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:107)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:291)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:276)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:174)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
连接器链接:
https://docs.mongodb.com/kafka-connector/master/kafka-source/
https://docs.confluent.io/current/connect/kafka-connect-elasticsearch