0

我使用 com.mongodb.kafka.connect.MongoSourceConnector

作为将数据从 MongoDB 导入 Kafka 的来源。需要添加十进制字段。但是在设置连接器并添加本文档中的类型后, https://avro.apache.org/docs/current/spec.html#Decimal 我收到一个错误 org.apache.kafka.connect.errors.DataException: Schema type of bytes but value was of type: decimal128是否可以为字段“金额”写精确的十进制? 谢谢!

蒙古记录

{
     "dateTime": 1613488304000,
       "tranProc": {
          "_id": "4589776833",
          "amount": 770.00
   }
}

我的配置

CREATE SOURCE CONNECTOR mongo_source_tran_stage_st WITH (
    'tasks.max'='1',
    'connector.class'='com.mongodb.kafka.connect.MongoSourceConnector',
    'output.format.value'='schema',
    'output.schema.value'='{
    "name":"MongoExchangeSchema",
    "type":"record",
    "namespace":"com.mongoexchange.avro",
    "fields":
    [
        {"name": "dateTime","type": ["long", "null"] },
        { "name": "tranProc",
             "type": [{"name": "tranProc", "type": "record", "fields": [
                  {"name": "_id", "type": ["long", "null"]},

                     {"name": "amount", "type": [{"type" :"bytes","logicalType":"decimal", "precision": 38, "scale": 18}, "null"]}

                  ]
                 }, "null" ] }               
         ]       
    }',
    'schema.compatibility'= 'BACKWARD',
    'change.stream.full.document'='updateLookup',
    'pipeline'='[{"$match":{"operationType":{"$in":["insert","update"]}}}]',
    'value.converter'='io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'='http://avro.local:8081',
    'key.converter.schemas.enable'=true,
    'key.converter'='io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'='http://avro.local.local:8081',
    'connection.uri'='mongodb://mongo_uri',
    'publish.full.document.only'= true,
    'topic.prefix'='mongo_struct',
    'auto.offset.reset' = 'earliest',
    'database'='db',
    'collection'='Coll',
'numeric.mapping'='best_fit_or_decimal'
);
`
4

0 回答 0