0

我已经设置了一个非常简单的mongo kafka 源连接器来将 mongo 的 oplog 流式传输到 kafka。但是,我看到在连接器发布的消息中,序列化的 oplog 事件不遵守扩展的 JSON 规范;例如,日期时间字段表示为:

{"$date": 1597841586927}

当规范说它应该被格式化为:

{"$date": {"$numberLong": "1597841586927"}}

为什么我没有得到干净的扩展 JSON?

注意:我的连接器配置文件如下所示:

{
  "name": "mongosource",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": 1,
    "connection.uri": "...",
    "topic.prefix":"mongosource",
    "database": "mydb",
    "copy.existing": true,
    "change.stream.full.document": "updateLookup",
  }
}
4

1 回答 1

0

源连接器的默认 json 格式化程序是旧的(请参阅连接器的 JIRA 项目上的这个问题)。

在此连接器的版本1.3.0中,您可以添加一个新的配置选项以要求连接器输出正确的扩展 JSON:

"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"
于 2020-09-29T16:39:32.547 回答