1

我正在尝试使用 python 2.7 和 Apache Avro(python 客户端)通过 kafka 代理交换序列化消息。我想知道是否有一种方法可以在不创建架构的情况下交换消息。

这是代码(使用模式,sensor.avsc,我想避免的事情):

from kafka import SimpleProducer, KafkaClient
import avro.schema
import io, random
from avro.io import DatumWriter

# To send messages synchronously
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka, async = False)

# Kafka topic
topic = "sensor_network_01"

# Path to user.avsc avro schema that i don't want
schema_path="sensor.avsc"
schema = avro.schema.parse(open(schema_path).read())


for i in xrange(100):
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    # creation of random data
    writer.write({"sensor_network_name": "Sensor_1", "value": random.randint(0,10), "threshold_value":10 }, encoder)

    raw_bytes = bytes_writer.getvalue()
    producer.send_messages(topic, raw_bytes)

这是 sensor.avsc 文件:

{
    "namespace": "sensors.avro",
    "type": "record",
    "name": "Sensor",
    "fields": [
        {"name": "sensor_network_name", "type": "string"},
        {"name": "value",  "type": ["int", "null"]},
        {"name": "threshold_value", "type": ["int", "null"]}
    ]
}
4

2 回答 2

3

这段代码:

import avro.schema
import io, random
from avro.io import DatumWriter, DatumReader
import avro.io

# Path to user.avsc avro schema
schema_path="user.avsc"
schema = avro.schema.Parse(open(schema_path).read())


for i in xrange(1):
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer.write({"name": "123", "favorite_color": "111", "favorite_number": random.randint(0,10)}, encoder)
    raw_bytes = bytes_writer.getvalue()

    print(raw_bytes)

    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    user1 = reader.read(decoder)
    print(" USER = {}".format(user1))

用于处理此模式

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

是你需要的。

归功于这个要点

于 2018-01-23T17:01:43.580 回答
0

我还没有看到有人这样做,但我自己也想要。您可能必须自己编写它,但它应该不会太糟糕 - 假设要序列化的对象很简单;您所要做的就是遍历这些字段并拥有从 python 类型到 avro 类型的映射。嵌套字段将需要诸如递归之类的东西来挖掘每个对象。

于 2016-06-06T22:14:56.617 回答