我在使用 confluent 时遇到问题,我创建了一个以 ID 作为增量列的 jdbc 源,以及一个 HDFS 接收器来将数据写入 Hive。
毕竟,我使用Hive浏览Hive中的数据,我可以看到Oracle中为“int”的ID列在Hive中变成了“binary”,而Oracle中为“date”的时间列是在 Hive 中变成了“时间戳”。
以下是模式寄存器的详细信息:
{
\"type\": \"record\",
\"name\": \"GAYS_KAKOUXINXI\",
\"fields\": [
{
\"name\": \"ID\",
\"type\": [
\"null\",
{
\"type\": \"bytes\",
\"connect.version\": 1,
\"connect.parameters\": {
\"scale\": \"0\"
},
\"connect.name\": \"org.apache.kafka.connect.data.Decimal\"
}
]
},
{
\"name\": \"DEP_ACTION_FLAG\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"SBMC\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"FXMC\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"FXLX\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"DD\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"KKJD\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"DEP_ACTION_TIME\",
\"type\": [
\"null\",
{
\"type\": \"long\",
\"connect.version\": 1,
\"connect.name\": \"org.apache.kafka.connect.data.Timestamp\"
}
]
},
{
\"name\": \"KKMC\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"GCSJ\",
\"type\": [
\"null\",
\"string\"
]
},
{
\"name\": \"CSYS\",
\"type\": [
\"null\",
\"string\"
]
}
],
\"connect.name\": \"GAYS_KAKOUXINXI\"
}
这是我的 schema-registry.properties:</p>
listeners=http://0.0.0.0:18081
kafkastore.connection.url=localhost:2171
kafkastore.topic=_schemas
debug=false
这是我的 connect-avro-distributed.properties:
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
这是怎么发生的?我该如何解决这个问题?这是模式注册表配置的问题吗?
此致,
飞然