当我尝试使用 JDBC Sink 连接器(Kafka Connect)从 Kafka 主题轮询消息时,我遇到了数组类型记录的以下错误。
错误:
“trace”:“org.apache.kafka.connect.errors.ConnectException:在错误处理程序 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) org.apache.kafka 中超出容差。 connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord( WorkerSinkTask.java:520) org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473) org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) org.apache。 kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) java.base/java.util. concurrent.Executors$RunnableAdapter.call(Executors.java:515) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829)引起:ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829)造成的:ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829)造成的:org.apache.kafka.connect.errors.DataException:展平转换不支持使用模式记录的 ARRAY(对于字段 RawASROutput.RawOutput)。org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:198) org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:195) org.apache.kafka.connect.transforms。 Flatten.applyWithSchema(Flatten.java:148) org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:77) org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java :50) org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\ t... 14 更多"
已安装的 Kafka Connect 插件:
http://localhost:8083/
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector_Flatten",
"type": "sink",
"version": "5.5.0"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "5.5.0"
},
{
"class": "io.mdrogalis.voluble.VolubleSourceConnector",
"type": "source",
"version": "0.2.0"
},
....
JDBC 同步连接器配置如下所示:
curl -X PUT http://localhost:8083/connectors/msme_asr_output/config \
-H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector_Flatten",
"topics" : "test-output",
"value.converter" : "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "true",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"auto.create" : true,
"auto.evolve" : true,
"table.name.format" : "test_output_table",
"insert.mode" : "insert",
"errors.log.enable" : true,
"errors.log.include.messages" : true,
"flatten" : "true",
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value
....
}'
我已经安装了以下连接器来制作扁平数组类型的对象
confluent-hub 安装 norsktipping/kafka-connect-jdbc_flatten:5.5.0 ( https://github.com/gertschouten/kafka-connect-jdbc-flatten )
谢谢!