0

我有一个 Kafka 集群正在运行,我想将 L2-orderbook 快照存储到一个主题中,该主题有一个 {key:value} 对字典,其中键的类型为 float,如下例所示:

{
    'exchange': 'ex1',
    'symbol': 'sym1',
    'book': {
        'bid': {
            100.0: 20.0,
            101.0: 21.3,
            102.0: 34.6,
            ...,
        },
        'ask': {
            100.0: 20.0,
            101.0: 21.3,
            102.0: 34.6,
            ...,
        }
    },
    'timestamp': 1642524222.1160505
}

我下面的架构建议不起作用,我很确定这是因为“出价”和“询问”字典中的键不是字符串类型。

{
    "namespace": "confluent.io.examples.serialization.avro",
    "name": "L2_Book",
    "type": "record",
    "fields": [
        {"name": "exchange", "type": "string"},
        {"name": "symbol", "type": "string"},
        {"name": "book", "type": "record", "fields": {
            "name": "bid", "type": "record", "fields": {
                {"name": "price", "type": "float"},
                {"name": "volume", "type": "float"}
            },
            "name": "ask", "type": "record", "fields": {
                {"name": "price", "type": "float"},
                {"name": "volume", "type": "float"}
            }
        },
        {"name": "timestamp", "type": "float"}
    ]
}

KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="no value and no default for bids"}

什么是适当的 avro 模式?

4

2 回答 2

1

首先,你有一个错字。fields需要是架构定义中的数组。

但是,您的出价(和要价)对象不是记录。他们是一个map<float, float>. 换句话说,它没有文字pricevolume键。

Avro 具有Map types,但键“假定为字符串”。

欢迎您尝试

{"name": "bid", "type": "map", "values": "float"}

否则,您需要重新格式化数据负载,例如作为对象列表

'bid': [
     {'price': 100.0, 'volume': 20.0},
     ...,
],

随着

{"name": "bid", "type": "array", "items": {
  "type": "record",
  "name": "BidItem",
  "fields": [
    {"name": "price", "type": "float"},
    {"name": "volume", "type": "float"}
  ]
}}
于 2022-01-18T17:41:00.630 回答
0

我终于想出了两个可行的解决方案。在这两种情况下,我都需要转换原始数据。

对我来说主要的教训是:

  1. avro 映射需要字符串类型的键
  2. 需要正确定义avro 复杂类型(例如地图和记录):
{"name": "bid", "type"
      {"type": "array", "items": {
          ...

特别感谢 OneCricketeer 为我指明了正确的方向!:-)

1) 以字符串类型的键为映射的出价和询价

数据示例

{
    'exchange': 'ex1',
    'symbol': 'sym1',
    'book': {
        'bid': {
            "100.0": 20.0,
            "101.0": 21.3,
            "102.0": 34.6,
            ...,
        },
        'ask': {
            "100.0": 20.0,
            "101.0": 21.3,
            "102.0": 34.6,
            ...,
        }
    },
    'timestamp': 1642524222.1160505
}

图式

{
    "namespace": "confluent.io.examples.serialization.avro",
    "name": "L2_Book",
    "type": "record",
    "fields": [
        {"name": "exchange", "type": "string"},
        {"name": "symbol", "type": "string"},
        {"name": "book", "type": {
            "name": "book",
            "type": "record",
            "fields": [
                {"name": "bid", "type": {
                    "type": "map", "values": "float"
                    }
                }, 
                {"name": "ask", "type": {
                    "type": "map", "values": "float"
                    }
                }
            ]}
        },
        {"name": "timestamp", "type": "float"}
    ]
}

2) 出价和询价作为记录数组

数据示例

{
    'exchange': 'ex1',
    'symbol': 'sym1',
    'book': {
        'bid': [
            {"price": 100.0, "volume": 20.0,}
            {"price": 101.0, "volume": 21.3,}
            {"price": 102.0, "volume": 34.6,}
            ...,
        ],
        'ask': [
            {"price": 100.0, "volume": 20.0,}
            {"price": 101.0, "volume": 21.3,}
            {"price": 102.0, "volume": 34.6,}
            ...,
        ]
    },
    'timestamp': 1642524222.1160505
}

图式

{
    "namespace": "confluent.io.examples.serialization.avro",
    "name": "L2_Book",
    "type": "record",
    "fields": [
        {"name": "exchange", "type": "string"},
        {"name": "symbol", "type": "string"},
        {"name": "book", "type": {
            "name": "book",
            "type": "record", 
            "fields": [
                {"name": "bid", "type": {
                    "type": "array", "items": {
                        "name": "bid",
                        "type": "record",
                        "fields": [
                            {"name": "price", "type": "float"},
                            {"name": "volume", "type": "float"}
                        ]
                    }
                }},
                {"name": "ask", "type": {
                    "type": "array", "items": {
                        "name": "ask",
                        "type": "record",
                        "fields": [
                            {"name": "price", "type": "float"},
                            {"name": "volume", "type": "float"}
                        ]
                    }
                }}
            ]}},
        {"name": "timestamp", "type": "float"}
    ]
}
于 2022-01-19T22:48:18.330 回答