将 kafka JDBC 连接器运行到 PSQL 时出现以下错误:
带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。
但是,我的主题包含以下消息结构,其中添加了一个模式,就像它在网上展示的一样:
rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{" schema ": {"type": "struct", "fields": [{"type": "int", "field ": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": “ICD”,“可选”:false},{“type”:“int”,“field”:“CPT”,“optional”:false},{“type”:“double”,“field”:“Cost” ", "可选": false}], "可选": false, "name": "test"}, " payload ": {"ID": "24427934", "Date": "2019-05-22”,“ICD”:“883.436”,“CPT”:“60502”,“成本”:“1374.36”}}”,分区:0
我对连接器的配置是:
curl -X PUT http://localhost:8083/connectors/claim_test/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/ae2772",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"topics":"test_7",
"auto.create":"true",
"insert.mode":"insert"
}'
经过一些更改,我现在收到以下消息:
WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int