我同时使用 S3 和 JDBC 接收器连接器,并且在存储数据时遇到了一些奇怪的行为。为了进行一些协调,我真的很想将 Kafka 摄取时间或记录生成时间保留到存储在 Sink 系统中的数据中。
我正在查看文档,但没有找到。我正在使用 Confluent 连接器,但如果允许我这样做,我也可以使用其他连接器,例如 Camel。
有人可以给我一些指示吗?
更新:根据 onecricketeer 的良好反馈,我知道我应该看看这个: https ://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
而且我还看到了这个例子: Kafka连接消费者引用偏移量并存储在消息中
我会测试它,但我是否理解正确,例如理论上我可以做这样的事情:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"
这将在记录中为我创建 3 个新属性,称为 recordOffset、recordPartition 和 recordTimestamp,其中包含所描述的值。
如果我想确保这些值总是存在或失败,我需要做(不确定我是否理解后缀部分):
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"