0

我同时使用 S3 和 JDBC 接收器连接器,并且在存储数据时遇到了一些奇怪的行为。为了进行一些协调,我真的很想将 Kafka 摄取时间或记录生成时间保留到存储在 Sink 系统中的数据中。

我正在查看文档,但没有找到。我正在使用 Confluent 连接器,但如果允许我这样做,我也可以使用其他连接器,例如 Camel。

有人可以给我一些指示吗?

更新:根据 onecricketeer 的良好反馈,我知道我应该看看这个: https ://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield

而且我还看到了这个例子: Kafka连接消费者引用偏移量并存储在消息中

我会测试它,但我是否理解正确,例如理论上我可以做这样的事情:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"

这将在记录中为我创建 3 个新属性,称为 recordOffset、recordPartition 和 recordTimestamp,其中包含所描述的值。

如果我想确保这些值总是存在或失败,我需要做(不确定我是否理解后缀部分):

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
4

1 回答 1

2

如前所述@OneCricketeerInsertField单消息转换在这里完成了工作。这是使用它的示例 S3 接收器配置:

{
          "connector.class"        : "io.confluent.connect.s3.S3SinkConnector",
          "storage.class"          : "io.confluent.connect.s3.storage.S3Storage",
          "s3.region"              : "us-west-2",
          "s3.bucket.name"         : "rmoff-smt-demo-01",
          "topics"                 : "customers,transactions",
          "tasks.max"              : "4",
          "flush.size"             : "16",
          "format.class"           : "io.confluent.connect.s3.format.json.JsonFormat",
          "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
          "schema.compatibility"   : "NONE",
          "partitioner.class"      : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
          "transforms"                          : "insertTS,formatTS",
          "transforms.insertTS.type"            : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field" : "messageTS",
          "transforms.formatTS.type"            : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.formatTS.format"          : "yyyy-MM-dd HH:mm:ss:SSS",
          "transforms.formatTS.field"           : "messageTS",
          "transforms.formatTS.target.type"     : "string"        
        }

请注意,它还用于TimestampConverter格式化字符串中的时间戳 - 默认情况下它是一个纪元。

你的问题促使我正确地写了这篇文章并记录了一个小教程 - 你可以在这里看到它:https ://youtu.be/3Gj_SoyuTYk

于 2020-12-08T22:52:26.640 回答