我正在使用 Mongodb kafka 源连接器和同步连接器开发 Elasticsearch-mongo 同步连接器。
这是我的源连接器配置
name=mongo-source-connector11
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
#output.format.value=schema
# Connection and source configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
connection.uri=mongodb://localhost:27017
database=localdb
collection=abcd
pipeline : [{'$match': { $or: [ { 'operationType': 'insert' }, { 'operationType': 'update' }, { 'operationType': 'delete' }, { 'operationType': 'replace' }, { 'operationType': 'rename' } ] } }]
poll.max.batch.size=1000
poll.await.time.ms=1000
output.schema.infer.value=true
output.format.key=schema
output.schema.key={\"name\":\"ClassroomId\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}
change.stream.full.document=updateLookup
copy.existing=true
这是我的弹性搜索接收器连接器配置
name=elasticsearch-sink-connector11
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=localdb.abcd
connection.url=http://localhost:9200
topic.index.map=localdb.abcd:localdb
type.name="_doc"
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
key.ignore=false
这工作正常,但在 Elasticsearch 长数据类型字段是这样的: -
"someLongField" :
{
"$numberLong" : "1006"
}
但我希望输出看起来像这样:-
"someLongField": 1006
所以在我的 kafka 源连接器配置中,我添加了
output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
就像这里提到的
添加后我的源连接器正在工作,我可以在“kafka-console-consumer.bat”中看到预期的输出,但是我的接收器连接器没有工作,它没有向 Elasticsearch 发送数据。
有人可以建议一些解决方案,以便我在 Elasticsearch 中获得 SimplifiesJson(即没有 "$numberLong", "$numberInteger" )。
编辑 这是我为 ElasticsearchSinkConnector 失败而获得的日志
[2021-09-06 17:40:35,085] ERROR WorkerSinkTask{id=elasticsearch-sink-connector1111-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Indexing record failed. (org.apache.kafka.connect.runtime.WorkerSinkTask:607)
org.apache.kafka.connect.errors.ConnectException: Indexing record failed.
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleMalformedDocResponse(ElasticsearchClient.java:455)
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:400)
at io.confluent.connect.elasticsearch.ElasticsearchClient.access$200(ElasticsearchClient.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:326)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:66)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:62)
at org.elasticsearch.action.bulk.Retry$RetryHandler.finishHim(Retry.java:167)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:114)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:76)
at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1577)
at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:572)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:307)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:301)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=object mapping for [fullDocument.someLongField] tried to parse field [someLongField] as object, but found a concrete value]]
at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:491)
at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:402)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:139)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:199)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1706)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1499)
... 19 more
[2021-09-06 17:40:35,090] ERROR WorkerSinkTask{id=elasticsearch-sink-connector1111-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Indexing record failed.
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleMalformedDocResponse(ElasticsearchClient.java:455)
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:400)
at io.confluent.connect.elasticsearch.ElasticsearchClient.access$200(ElasticsearchClient.java:65)
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:326)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:66)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:62)
at org.elasticsearch.action.bulk.Retry$RetryHandler.finishHim(Retry.java:167)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:114)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:76)
at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1577)
at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:572)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:307)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:301)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 more
Caused by: ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=object mapping for [fullDocument.someLongField] tried to parse field [someLongField] as object, but found a concrete value]]
at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:491)
at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:402)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:139)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:199)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1706)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1499)
... 19 more