我使用 com.mongodb.kafka.connect.MongoSourceConnector
作为将数据从 MongoDB 导入 Kafka 的来源。需要添加十进制字段。但是在设置连接器并添加本文档中的类型后,
https://avro.apache.org/docs/current/spec.html#Decimal
我收到一个错误
org.apache.kafka.connect.errors.DataException: Schema type of bytes but value was of type: decimal128
是否可以为字段“金额”写精确的十进制? 谢谢!
蒙古记录
{
"dateTime": 1613488304000,
"tranProc": {
"_id": "4589776833",
"amount": 770.00
}
}
我的配置
CREATE SOURCE CONNECTOR mongo_source_tran_stage_st WITH (
'tasks.max'='1',
'connector.class'='com.mongodb.kafka.connect.MongoSourceConnector',
'output.format.value'='schema',
'output.schema.value'='{
"name":"MongoExchangeSchema",
"type":"record",
"namespace":"com.mongoexchange.avro",
"fields":
[
{"name": "dateTime","type": ["long", "null"] },
{ "name": "tranProc",
"type": [{"name": "tranProc", "type": "record", "fields": [
{"name": "_id", "type": ["long", "null"]},
{"name": "amount", "type": [{"type" :"bytes","logicalType":"decimal", "precision": 38, "scale": 18}, "null"]}
]
}, "null" ] }
]
}',
'schema.compatibility'= 'BACKWARD',
'change.stream.full.document'='updateLookup',
'pipeline'='[{"$match":{"operationType":{"$in":["insert","update"]}}}]',
'value.converter'='io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'='http://avro.local:8081',
'key.converter.schemas.enable'=true,
'key.converter'='io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url'='http://avro.local.local:8081',
'connection.uri'='mongodb://mongo_uri',
'publish.full.document.only'= true,
'topic.prefix'='mongo_struct',
'auto.offset.reset' = 'earliest',
'database'='db',
'collection'='Coll',
'numeric.mapping'='best_fit_or_decimal'
);
`