2

我正在尝试从已使用 google 的 protobuf 序列化的 kafka topiv 中读取数据。

protoc我使用生成的文件编译了原型pb2文件。

现在我正在尝试使用 faust 并创建一个流处理器,但我找不到将 pb2 文件用作key_serializervalue_serializer.

这是我尝试过的:

import faust
from proto.topic_pb2 import topic


app = faust.App(
    'faust-consumer',
    broker='kafka://',
    store="memory://",
    cache="memory://",
)

schema = faust.Schema(
    ## key_type=topic.PK,
    ## value_type=topic,
    key_serializer=topic.PK,
    value_serializer=topic,
)

topic = app.topic(
    'topic',
    schema=schema
)


@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)


if __name__ == "__main__":
    app.main()

有人知道如何在序列化程序中使用 pb2 吗?

4

3 回答 3

2

伙计,过去一周我也试图做同样的事情。经过努力,我终于得到了一些工作——不是最好的方法——但它工作得很好。

所以最初我使用这个 python 编译器:https ://github.com/danielgtaylor/python-betterproto来生成*.py带有数据类/类型提示的文件。

然后,我能够Faust.Record使用帮助程序动态创建类:

import abc
import inspect
from typing import Type

import betterproto
import faust

GENERATED_SUFFIX = "__FaustRecord_Auto"


def _import_relative_class(module: str, klass_name: str):
    resolved_import = __import__(module, fromlist=[klass_name])
    klass = getattr(resolved_import, klass_name)
    return klass


def _is_record(attype: Type):
    return (
        inspect.isclass(attype)
        and isinstance(attype, betterproto.Message)
        or isinstance(attype, abc.ABCMeta)
    )


def _build_record_annotations(klass: Type):
    annotations = {}
    for atname, attype in klass.__annotations__.items():
        if _is_record(attype):
            annotations[atname] = make_faust_record(attype)
        elif isinstance(attype, str):
            subklass = _import_relative_class(klass.__module__, attype)
            annotations[atname] = make_faust_record(subklass)
        else:
            annotations[atname] = attype

    return annotations


def make_faust_record(klass: Type):
    type_name = f"{klass.__name__}{GENERATED_SUFFIX}"
    record_type = type(type_name, (faust.Record, klass), {})
    record_type.__annotations__ = _build_record_annotations(klass)
    record_type._init_subclass()

    return record_type

现在你可以像这样使用它:

import faust
from proto.your_models import YourModel # Import your generated proto here
from faust_converter import make_faust_record


app = faust.App(
    'faust-consumer',
    broker='kafka://',
    store="memory://",
    cache="memory://",
)

model_record = make_faust_record(YourModel)

topic = app.topic(
    'topic',
    value_type=model_record
)


@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)


if __name__ == "__main__":
    app.main()
于 2020-11-19T12:57:14.477 回答
1

我也在尝试将 Protobuf 与 Faust 一起使用。

下面提到的是使用 Faust Serialiser Codecs 的解决方案。 faust-protobuf https://github.com/hemantkashniyal/faust-protobuf

proto_serializer.py

from faust.serializers import codecs
from typing import Any

from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson
from google.protobuf.json_format import MessageToDict
from google.protobuf import text_format
from google.protobuf.text_format import MessageToString
from google.protobuf.text_format import MessageToBytes

class ProtobufSerializer(codecs.Codec):
    def __init__(self, pb_type: Any):
        self.pb_type = pb_type
        super(self.__class__, self).__init__()

    def _dumps(self, pb: Any) -> bytes:
        return pb.SerializeToString()

    def _loads(self, s: bytes) -> Any:
        pb = self.pb_type()
        pb.ParseFromString(s)
        return pb

应用程序.py

import faust
from google.protobuf.json_format import MessageToJson

from .proto.greetings_pb2 import Greeting

from .proto_serializer import ProtobufSerializer

app = faust.App(
    'faust-consumer',
    broker='kafka://', # TODO: update kafka endpoint
    store="memory://",
    cache="memory://",
)

greetings_schema = faust.Schema(
    key_serializer=ProtobufSerializer(pb_type=Greeting),
    value_serializer=ProtobufSerializer(pb_type=Greeting),
)

topic = app.topic(
    'greetings',
    schema=greetings_schema
)

@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(MessageToJson(event))

@app.timer(5)
async def produce():
    for i in range(10):
        data = Greeting(hello="world", message=i)
        await consume.send(value=data)

if __name__ == "__main__":
    app.main()
于 2020-11-28T13:29:40.917 回答
0

我可以通过创建一个 Serializer 类来做到这一点:

import faust
from abc import ABCMeta, abstractmethod
from google.protobuf.json_format import MessageToDict
from faust.serializers.codecs import Codec
from importlib import import_module


def get_proto(topic_name, only_pk=False):
    if not hasattr(get_proto, "topics"):
        setattr(get_proto, "topics", dict())

    get_proto.topics[topic_name] = import_module(
        "protodef.{}_pb2".format(topic_name)
    ).__getattribute__(topic_name.split(".")[-1])

    if only_pk:
        return getattr(get_proto, "topics").get(topic_name).PK
    else:
        return getattr(get_proto, "topics").get(topic_name)


class ProtoSerializer(Codec, metaclass=ABCMeta):
    @abstractmethod
    def only_key(self):
        ...

    def as_proto(self, topic_name):
        self._proto = get_proto(topic_name, self.only_key())
        return self

    def _loads(self, b):
        data = MessageToDict(
            self._proto.FromString(b),
            preserving_proto_field_name=True,
            including_default_value_fields=True,
        )

        # remove the key object from the unserialized message
        data.pop("key", None)
        return data

    def _dumps(self, o):
        # for deletes
        if not o:
            return None

        obj = self._proto()

        # add the key object to them message before serializing
        if hasattr(obj, "PK"):
            for k in obj.PK.DESCRIPTOR.fields_by_name.keys():
                if k not in o:
                    raise Exception(
                        "Invalid object `{}` for proto `{}`".format(o, self._proto)
                    )
                setattr(obj.key, k, o[k])

        for k, v in o.items():
            if hasattr(obj, k):
                setattr(obj, k, v)
            else:
                ghost.debug(
                    "Invalid value-attribute `%s` for proto `%s`", k, self._proto
                )

        return obj.SerializeToString()


class ProtoValue(ProtoSerializer):
    def only_key(self):
        return False


class ProtoKey(ProtoSerializer):
    def only_key(self):
        return True

然后按如下方式使用它:

import faust
from utils.serializer import ProtoKey, ProtoValue


app = faust.App(
    'faust-consumer',
    broker='kafka://',
    store="memory://",
    cache="memory://",
)


topic = app.topic(
    'topic',
    key_serializer=ProtoKey().as_proto('topic'),
    value_serializer=ProtoValue().as_proto('topic')
)


@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)


if __name__ == "__main__":
    app.main()
于 2020-11-20T15:27:52.457 回答