0

当我们尝试使用默认配置通过 S3 接收器连接器写入 S3 时,工作正常,没有任何问题。但是当我们尝试每小时分区失败并出现以下错误时。请找到代码和错误消息并帮助我们解决此问题

默认 :

{
  "value.converter.schemas.enable": "false",
  "name": "tibconew1-test-s3standard-default-sink-connector",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "2",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "errors.tolerance": "all",
  "topics": [
    "test.s3custom.default.dax.shipment.data",
    "test.s3custom.default.dax.shipment.data",
    "test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn"
  ],
  "topics.regex": "",
  "errors.deadletterqueue.topic.name": "dlq_test.s3custom.default.dax.shipment.data",
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "flush.size": "1000",
  "s3.bucket.name": "test-stg-raw",
  "s3.region": "us-east-1",
  "s3.credentials.provider.class": "com.amazonaws.auth.InstanceProfileCredentialsProvider",
  "s3.acl.canned": "bucket-owner-full-control",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "streams_dir",
  "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}

每小时:

{
  "value.converter.schema.registry.url": "https://confschema.test-dsol-core.testdigital-stg.com",
  "value.converter.schemas.enable": "false",
  "name": "test.s3custom.hourly.tibco.dax_shipment.dpp_asn.sink-connector",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "2",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "errors.tolerance": "all",
  "topics": [
    "test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn"
  ],
  "topics.regex": "",
  "errors.deadletterqueue.topic.name": "dlq_test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn.sink",
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "flush.size": "10",
  "s3.bucket.name": "test-stg-raw",
  "s3.region": "us-east-1",
  "s3.credentials.provider.class": "com.amazonaws.auth.InstanceProfileCredentialsProvider",
  "s3.acl.canned": "bucket-owner-full-control",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "streams_dir",
  "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
  "locale": "en-US",
  "timezone": "America/Chicago",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "DPP_ASN.LST_UPDT_TS"
} 

错误 :

在此处输入图像描述

4

1 回答 1

0

最后我们找到了原因。由于从有效负载接收到的时间戳是无效格式,其中有额外的空间。所以我们在源端更正了格式。对于每小时分区器,连接器期望该值基于小时。每小时分区:

io.confluent.connect.storage.partitioner.HourlyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH and

消息是: "LST_UPDT_TS":"2021-02-01 07:16:23.567"

更正为:"LST_UPDT_TS":"2015-08-01T17:00:00.69243-05:00"

于 2021-03-01T21:04:58.280 回答