0

我正在尝试使用 Mongo Kafka 连接器捕获 MongoDb 更改数据。当我将集合名称(即as)放入时它可以工作collection=collection1,但是当我将集合留空并使用以下内容时我无法让它工作pipeline[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

这是属性文件的样子:

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=test
collection=

topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
# pipeline=[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]
pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

batch.size=0
publish.full.document.only=true
change.stream.full.document=updateLookup
collation=

我从运行中收到以下消息bin/connect-standalone.sh

WARN Failed to resume change stream: {aggregate: 1} is not valid for '$changeStream'; a collection is required. 73

我正在使用 mongodbv3.6

4

1 回答 1

0

如果您指定一个database参数,则连接器希望您也提供一个collection参数。

事实上,如果使用 Mongo 3.6,我相信您一次只能收听一个数据库/集合组合。这在 Mongo 4.0 中发生了变化,如下所示:https ://docs.mongodb.com/manual/release-notes/4.0/#change-streams 。

于 2020-12-22T20:57:36.877 回答