0

我正在尝试使用我拥有的模式向 kafka 生成一条 avro 消息。(使用 confluent-kafka python 包生产者)

生产者工作正常,除了“字节”字段值,它没有在消费者端正确反序列化。这些“字节”字段的值实际上是十进制值,必须设置比例和精度。

我可以在 Scala 中做到这一点,下面是 Scala 的代码,我正在寻找 Python。

架构(仅特定字段):

{
                "name": "Longitude",
                "type": ["null", {
                    "type": "bytes",
                    "scale": 10,
                    "precision": 13,
                    "connect.version": 1,
                    "connect.parameters": {
                        "scale": "10",
                        "connect.decimal.precision": "13"
                    },
                    "connect.name": "org.apache.kafka.connect.data.Decimal",
                    "logicalType": "decimal"
                }],
                "default": null
            }

斯卡拉实现:

def getByteBufferOrNullForByteTyeField(columnVal : AnyRef, precision: Int, scale: Int): ByteBuffer = {
  if(columnVal != null){
    val doubleVal = columnVal.asInstanceOf[Double]
    import java.math.BigDecimal
    import java.nio.ByteBuffer
    val decimalSchema = Decimal.schema(scale)
    val mathContext = new MathContext(precision, RoundingMode.HALF_DOWN)
    val valueBuffer = ByteBuffer.wrap(
      Decimal.fromLogical(
        decimalSchema,
        new BigDecimal(doubleVal, mathContext).setScale(10, RoundingMode.HALF_UP)
      )
    )
    valueBuffer
  }else null
}

谢谢

4

0 回答 0