我正在尝试使用我拥有的模式向 kafka 生成一条 avro 消息。(使用 confluent-kafka python 包生产者)
生产者工作正常,除了“字节”字段值,它没有在消费者端正确反序列化。这些“字节”字段的值实际上是十进制值,必须设置比例和精度。
我可以在 Scala 中做到这一点,下面是 Scala 的代码,我正在寻找 Python。
架构(仅特定字段):
{
"name": "Longitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}
斯卡拉实现:
def getByteBufferOrNullForByteTyeField(columnVal : AnyRef, precision: Int, scale: Int): ByteBuffer = {
if(columnVal != null){
val doubleVal = columnVal.asInstanceOf[Double]
import java.math.BigDecimal
import java.nio.ByteBuffer
val decimalSchema = Decimal.schema(scale)
val mathContext = new MathContext(precision, RoundingMode.HALF_DOWN)
val valueBuffer = ByteBuffer.wrap(
Decimal.fromLogical(
decimalSchema,
new BigDecimal(doubleVal, mathContext).setScale(10, RoundingMode.HALF_UP)
)
)
valueBuffer
}else null
}
谢谢