1

我正在 confluent.cloud(Kafka) 中创建弹性接收器连接器。在这里,如果我使用变换,它就不起作用。confluent.cloud(kafka) 会支持转换吗?

下面是我的脚本,

{
      "name": "ElasticConnectorTest5",
      "config": {
      "topics": "enterprise.contact6.model",
      "input.data.format": "AVRO",
      "connector.class": "ElasticsearchSink",
      "name": "ElasticConnectorTest5",
      "kafka.api.key": "xxxxxxxxxxxxx",
      "kafka.api.secret": "xxxxxxxxxxxxxxxxxx",
      "connection.url": "url",
      "connection.username": "**********",
      "connection.password": "*********",
      "type.name": "_doc",
      "key.ignore": "false",
      "schema.ignore": "true",
      "tasks.max": "1",
      "errors.tolerance": "all",
      "errors.log.enable": "true",
      "errors.log.include.messages": "true",
      "behavior.on.malformed.documents": "warn",
      "transforms": "InsertMetadata,IndexName",
      "transforms.InsertMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertMetadata.partition.field": "partition",
      "transforms.InsertMetadata.offset.field": "offset",
      "transforms.IndexName.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.IndexName.regex": "enterprise.(.*)",
      "transforms.IndexName.replacement": "es.$1"
  }
}
4

1 回答 1

2

Confluent Cloud 现在支持单消息转换 (SMT)(2021 年 10 月):

我们很高兴地宣布,我们的大多数Confluent Cloud 连接器现在都支持单个消息转换、连接器日志事件和连接器数据输出预览。

请注意,公告中有“大多数”一词,尚不支持某些转换

托管连接器当前不支持 RegexRouter SMT。一些接收器连接器不支持以下转换。此限制在每个受影响的连接器的连接器快速入门中都有说明。

    org.apache.kafka.connect.transforms.TimestampRouter
    io.confluent.connect.transforms.MessageTimestampRouter
    io.confluent.connect.transforms.ExtractTopic$Key
    io.confluent.connect.transforms.ExtractTopic$Value

以及Elasticsearch Service Sink Connector for Confluent Cloud 的一些特定限制

于 2021-10-29T13:48:00.507 回答