0

我正在使用 mongodb kafka source connector v1.6 ... kafka connect 以分布式模式运行问题是来自 mongo db 的消息未发布到相应的 kafka 主题

日志:

INFO Opened connection [connectionId{localValue:7, serverValue:283461}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:14,616] INFO Opened connection [connectionId{localValue:8, serverValue:283462}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:16,021] INFO Copying existing data on the following namespaces: [ecaf-staging.augmentPlanRelationship, ecaf-staging.augmentPlan, ecaf-staging.device, ecaf-staging.location] (com.mongodb.kafka.connect.source.MongoCopyDataManager:104)
[2021-12-05 10:59:16,035] INFO Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203)
[2021-12-05 10:59:16,036] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:225)
[2021-12-05 10:59:16,386] INFO Opened connection [connectionId{localValue:9, serverValue:283463}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:16,394] INFO Opened connection [connectionId{localValue:10, serverValue:283464}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:24,042] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 10:59:31,037] INFO Shutting down executors (com.mongodb.kafka.connect.source.MongoSourceTask:604)
[2021-12-05 10:59:31,037] INFO Finished copying existing data from the collection(s). (com.mongodb.kafka.connect.source.MongoSourceTask:611)
[2021-12-05 10:59:31,038] INFO Watching for database changes on 'ecaf-staging' (com.mongodb.kafka.connect.source.MongoSourceTask:677)
[2021-12-05 10:59:31,066] INFO Resuming the change stream after the previous offset: {"_data": "8261AC9B83000023282B0229296E04"} (com.mongodb.kafka.connect.source.MongoSourceTask:415)
[2021-12-05 10:59:34,043] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 10:59:44,044] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 10:59:54,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:04,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:14,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:24,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:34,054] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:44,055] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)

下面是我的配置文件:

{
"name":"mongo-DB",
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max":"1",
"connection.uri":"",
"database":"DB",
"copy.existing":"true",
"copy.existing.namespace.regex":"DB.augmentPlan$|DB.device$|DB.location$|DB.augmentPlanRelationship",
"topic.namespace.map":"{\"DB.augmentPlan\\\" : \"ecaf-cdc-augment-plans\",\"DB.augmentPlanRelationship\\\" : \"ecaf-cdc-augment-plan-mappings\",\"DB.device\\\" : \"ecaf-cdc-devices\",\"DB.location\\\" : \"ecaf-cdc-locations\"}",
"poll.max.batch.size":"1000",
"poll.await.time.ms":"5000",
"pipeline":"[{\"$match\":{\"ns.coll\": {\"$regex\": \"\/^(DB.augmentPlan|DB.device|DB.location)$\/\"}}}]",
"batch.size":"1",
"change.stream.full.document":"updateLookup",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"publish.full.document.only":"true"

}

请帮我修复它

4

0 回答 0