0

当我尝试使用 JDBC Sink 连接器(Kafka Connect)从 Kafka 主题轮询消息时,我遇到了数组类型记录的以下错误。

错误:

“trace”:“org.apache.kafka.connect.errors.ConnectException:在错误处理程序 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) org.apache.kafka 中超出容差。 connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord( WorkerSinkTask.java:520) org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473) org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) org.apache。 kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) java.base/java.util. concurrent.Executors$RunnableAdapter.call(Executors.java:515) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829)引起:ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829)造成的:ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829)造成的:org.apache.kafka.connect.errors.DataException:展平转换不支持使用模式记录的 ARRAY(对于字段 RawASROutput.RawOutput)。org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:198) org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:195) org.apache.kafka.connect.transforms。 Flatten.applyWithSchema(Flatten.java:148) org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:77) org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java :50) org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\ t... 14 更多"

已安装的 Kafka Connect 插件:

http://localhost:8083/

[
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector_Flatten",
    "type": "sink",
    "version": "5.5.0"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "5.5.0"
  },
  {
    "class": "io.mdrogalis.voluble.VolubleSourceConnector",
    "type": "source",
    "version": "0.2.0"
  },
....

JDBC 同步连接器配置如下所示:

curl -X PUT http://localhost:8083/connectors/msme_asr_output/config \
     -H "Content-Type: application/json" -d '{
    "connector.class"                       : "io.confluent.connect.jdbc.JdbcSinkConnector_Flatten",
    "topics"                                : "test-output",
    "value.converter"                       : "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable"          : "false",
    "value.converter.schemas.enable"        : "true",
    "key.converter"                         : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schema.registry.url"   : "http://schema-registry:8081",
    "auto.create"                           : true,
    "auto.evolve"                           : true,
    "table.name.format"                     : "test_output_table",
    "insert.mode"                           : "insert",
    "errors.log.enable"                     : true,
    "errors.log.include.messages"           : true,
    "flatten"                               : "true",
    "transforms"                            : "flatten",
    "transforms.flatten.type"               : "org.apache.kafka.connect.transforms.Flatten$Value
    ....
}'

我已经安装了以下连接器来制作扁平数组类型的对象

confluent-hub 安装 norsktipping/kafka-connect-jdbc_flatten:5.5.0 ( https://github.com/gertschouten/kafka-connect-jdbc-flatten )

谢谢!

4

0 回答 0