我正在使用以下源连接器配置来过滤和读取来自 MongoDB 的状态为“PENDING”的特定记录。只需要轮询所有插入/更新为 PENDING 状态的记录。如果排除管道,源连接器能够轮询所有记录。有人可以帮我理解如何解决这个问题,还有没有办法知道轮询已经完成,就像批处理作业完成一样,以协调 kafka 消费者的另一个进程?
name=mongo-source-demo
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://username:password@hostname:27017
database=test
collection=mongoDBtest
topic.prefix=mongodb.connector
poll.max.batch.size=1000
poll.await.time.ms=100000
publish.full.document.only=true
pipeline=[{"$match": { "Status" : "PENDING" }},{"$project":{"_id":1,"fullDocument":1}} ]