1

发布第一个插入:

{"Customer_id": 2, "transaction_id": "1", "idd": [999, 1111], "id": 1}

然后是第二个:

{"Customer_id": 2, "transaction_id": "2", "idd": [9, 10], "id": 1}

要求的结果:

{"Customer_id": 2, "transaction_id": "2", "idd": [[9, 10] , [999, 1111]], "id": 1}

我得到了什么:

{"Customer_id": 2, "transaction_id": "2", "idd": [9, 10] , "id": 1}

更新策略此更新数组不追加,但我的预期结果追加数组

配置 :

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=customer_id,transaction_id
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

我们如何通过 mongo-kafka sink 连接器做到这一点

4

1 回答 1

0

看起来您正在尝试聚合流数据

除非 MongoDB 能够通过插入相同的文档 ID 以某种方式提供此功能(我怀疑这是因为它怎么知道您只想收集 idd 字段?),否则您将不得不使用 Kafka Streams / KSQL 或其他有状态处理层, 汇总您的值。接收器连接器只是将它看到的任何东西转发到数据库中;连接器不知道以前的记录

于 2019-12-31T14:08:56.523 回答