0

我正在使用带有融合 kafka 代码的融合 s3 接收器连接器,用于基本 kafka-connect(v5.2.1)。

最初,MySQL cdc 以 JSON 格式(使用 maxwell)写入 kafka 主题(未写入模式)。这个 kafka 连接器从上面的 apache kafka 集群中读取数据并将其写入 s3。

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

我正在使用具有以下连接器配置的分布式连接器:

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"behavior.on.null.values": "ignore",
"s3.region": "ap-southeast-1",
"flush.size": "1000",
"schema.compatibility": "NONE",
"topics": "audit",
"tasks.max": "3",
"s3.part.size": "5242880",
"timezone": "UTC",
"locale": "en_US_POSIX",
"retry.backoff.ms": "100",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"name": "s3-sink-connector-um-cdc-events",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "tickledb/prod/audit",
"timestamp.extractor": "Record",
"s3.retry.backoff.ms": "100",
"rotate.schedule.interval.ms": "10000"

}

虽然这通常有效,但我一直看到这个错误。

task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): Expected space separating root-level values
 at [Source: (byte[])"3d5b807d-a6c7-43da-bbcc-d40efe9753a7"; line: 1, column: 3]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): Expected space separating root-level values
 at [Source: (byte[])"3d5b807d-a6c7-43da-bbcc-d40efe9753a7"; line: 1, column: 3]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:638)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1635)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1375)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:830)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

在 Jackson 中出现解析错误。此数据来自 MySQL 的 cdc 事件。所以不可能改变这些数据(没有破坏性的改变)。如果 kafka 正在接受 json 数据,那么它对于 kafka-connector 也应该是合法的。

4

1 回答 1

0

我没有找到失败的原因,但我更新了连接器配置以防止任务进入“失败”状态,并捕获导致失败的实际消息。

更新的配置字段

"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "audit_dlq_sink_00",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.log.include.messages": "true",
"errors.log.include.messages": "true",
"errors.log.enable": "true",

有了这个更新的配置,事件解析不会停止我的任务。它将消息发送到配置的 DLQ。但是,它不会将实际的错误消息发送到 DLQ,而只会将无法解析的消息发送到 DLQ。

于 2021-05-21T08:28:28.470 回答