0

将 kafka JDBC 连接器运行到 PSQL 时出现以下错误:

带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。

但是,我的主题包含以下消息结构,其中添加了一个模式,就像它在网上展示的一样:

rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{" schema ": {"type": "struct", "fields": [{"type": "int", "field ": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": “ICD”,“可选”:false},{“type”:“int”,“field”:“CPT”,“optional”:false},{“type”:“double”,“field”:“Cost” ", "可选": false}], "可选": false, "name": "test"}, " payload ": {"ID": "24427934", "Date": "2019-05-22”,“ICD”:“883.436”,“CPT”:“60502”,“成本”:“1374.36”}}”,分区:0

我对连接器的配置是:

 curl -X PUT http://localhost:8083/connectors/claim_test/config \
    -H "Content-Type: application/json" \
    -d '{
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "connection.url":"jdbc:postgresql://localhost:5432/ae2772",
     "key.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable":"true",
     "topics":"test_7",
     "auto.create":"true",
     "insert.mode":"insert"
    }'

经过一些更改,我现在收到以下消息:

WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int

4

1 回答 1

0

int不是有效的架构类型。应该是int8, int16, int32, 或int64.

同样,datevarchar也是double无效的。

JSON 中使用的类型与 Postgres 或任何特定于 SQL 的类型不同(日期应转换为 Unix Epochint64时间或设为 a string)。

您可以在此处找到支持的模式类型:https ://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java

于 2022-02-04T22:32:16.710 回答