0

我正在使用源连接器来观察 Mongo 集合中的变化并将它们带到 Kafka 主题。如果满足特定条件(名称= Kathe),这很好,直到我添加了将它们放入 Kafka 主题的要求。这意味着只要更新过程将名称更改为 Kathe,我就需要将数据放入主题中。

我的连接器的配置如下所示:

{
    "connection.uri":"xxxxxx",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",
    "topic.prefix": "qu",
    "database":"sample_analytics",
    "collection":"customers",
    "copy.existing": "true", 
    "pipeline":"[{\"$match\":{\"name\":\"Kathe\"}}]",   
    "publish.full.document.only": "true",
    "flush.timeout.ms":"15000"
}

我也试过

"pipeline":"[{\"$match\":{\"name\":{ \"$eq\":\"Kathe\"}}}]"

但是当条件满足时,它不会产生消息。

我犯错了吗?

4

0 回答 0