我正在使用源连接器来观察 Mongo 集合中的变化并将它们带到 Kafka 主题。如果满足特定条件(名称= Kathe),这很好,直到我添加了将它们放入 Kafka 主题的要求。这意味着只要更新过程将名称更改为 Kathe,我就需要将数据放入主题中。
我的连接器的配置如下所示:
{
"connection.uri":"xxxxxx",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"topic.prefix": "qu",
"database":"sample_analytics",
"collection":"customers",
"copy.existing": "true",
"pipeline":"[{\"$match\":{\"name\":\"Kathe\"}}]",
"publish.full.document.only": "true",
"flush.timeout.ms":"15000"
}
我也试过
"pipeline":"[{\"$match\":{\"name\":{ \"$eq\":\"Kathe\"}}}]"
但是当条件满足时,它不会产生消息。
我犯错了吗?