0

我有一个 mongo sink 连接器以及一个模式注册表。

我将 mongo sink 连接器配置为访问类似于:https ://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md#configuration-example-for-avro 的模式注册表

我按照以下方式创建了一个模式:https ://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md#logical-types 。它看起来像这样:

{
  "type": "record",
  "name": "MyLogicalTypesRecord",
  "namespace": "com.mongodb.kafka.data.kafka.avro",
  "fields": [
    {
      "name": "myTimestampMillisField",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

但是,当记录通过时,数据看起来像这样:{ "myTimestampMillisField": 1572035138104 },而不是类似于 this 的东西{ "myTimestampMillisField": ISODate("2019-10-25T20:28:19.628Z") }

我检查了模式注册表以确保逻辑类型在那里并且看起来不错。

我不确定我做错了什么,或者是否有更好的方法在 mongo 中设置为 Date 类型。有任何想法吗?

4

1 回答 1

1

我想到了。

https://docs.confluent.io/current/connect/transforms/timestampconverter.html

"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "myTimestampMillisField",
"transforms.TimestampConverter.target.type": "Date"
于 2019-10-25T21:04:20.687 回答