我正在尝试使用 Mongo Kafka 连接器捕获 MongoDb 更改数据。当我将集合名称(即as)放入时它可以工作collection=collection1
,但是当我将集合留空并使用以下内容时我无法让它工作pipeline
[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]
pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]
这是属性文件的样子:
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=test
collection=
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
# pipeline=[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]
pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]
batch.size=0
publish.full.document.only=true
change.stream.full.document=updateLookup
collation=
我从运行中收到以下消息bin/connect-standalone.sh
:
WARN Failed to resume change stream: {aggregate: 1} is not valid for '$changeStream'; a collection is required. 73
我正在使用 mongodbv3.6