5

我正在尝试使用Python解释DebeziumKafka中存储的Avro记录

           {
              "name": "id",
              "type": {
                "type": "bytes",
                "scale": 0,
                "precision": 64,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "0"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }
            }

我不确定这对应于哪个 Python 3 原始类型。这个值如何反序列化?

提前致谢!

4

3 回答 3

2

org.apache.kafka.connect.data.Decimal是未缩放整数的 base64 编码字节表示。为了将此值转换为Decimal,您需要将 base64 字符串解码为字节,获取整数,然后按parameters.scale值缩放。

此架构:

{
  "type": "bytes",
  "name": "org.apache.kafka.connect.data.Decimal",
  "version": 1,
  "parameters": {
    "scale": "9",
    "connect.decimal.precision": "38"
  },
  "field": "amount"
}

可以使用以下代码段进行转换(在Pyfiddle上尝试):

ctx = decimal.Context()
ctx.prec = 38  # connect.decimal.precision = 38
result = ctx.create_decimal(
    int.from_bytes(base64.b64decode("GZ6ZFQvYpA=="), byteorder='big')
) / 10 ** 9  # scale = 9
于 2020-03-29T00:40:54.937 回答
2

如果你看

https://insight.io/github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java

public static byte[] fromLogical(Schema schema, BigDecimal value) {
    if (value.scale() != scale(schema))
        throw new DataException("BigDecimal has mismatching scale value for given Decimal schema");
    return value.unscaledValue().toByteArray();
}

如您所见,它使用 BigDecimal,这相当于Decimal在 python

Java 的 BigDecimal 的 python 是什么?

因此,在这种情况下,您应该寻找 Decimal。

第 2 部分 - 反序列化

关于反序列化,我需要反馈来更新答案。到目前为止,您如何在其他领域做到这一点?

于 2017-10-26T07:27:41.707 回答
1

您的代码正在运行,这很棒。但是当我将它用于负数时,它会给出错误的结果。例如,当我使用您的代码返回 635.02 时,-20.62 数字被 debezium 转换为 kafka 的“+CA =”。

有什么方法可以转换负数。谢谢

编辑:我从下面的链接中找到了解决方案,然后我就这样改变了。

import decimal
import base64
def big_decimal_to_decimal(big_decimal, scale, precision):
    bytes_val = base64.decodebytes(big_decimal.encode())
    bval = "".join("{0:08b}".format(c) for c in bytes_val)
    intval = int(bval, 2)
    if bytes_val[0] & 0x70 != 0:
        intval -= int('1' + '00' * len(bytes_val), 16)
    return intval/(10**scale)

链接:将base64 编码的字节数组解码为(负)十进制值(Java 到 Python)

于 2022-03-04T12:41:18.807 回答