我可以通过创建一个 Serializer 类来做到这一点:
import faust
from abc import ABCMeta, abstractmethod
from google.protobuf.json_format import MessageToDict
from faust.serializers.codecs import Codec
from importlib import import_module
def get_proto(topic_name, only_pk=False):
if not hasattr(get_proto, "topics"):
setattr(get_proto, "topics", dict())
get_proto.topics[topic_name] = import_module(
"protodef.{}_pb2".format(topic_name)
).__getattribute__(topic_name.split(".")[-1])
if only_pk:
return getattr(get_proto, "topics").get(topic_name).PK
else:
return getattr(get_proto, "topics").get(topic_name)
class ProtoSerializer(Codec, metaclass=ABCMeta):
@abstractmethod
def only_key(self):
...
def as_proto(self, topic_name):
self._proto = get_proto(topic_name, self.only_key())
return self
def _loads(self, b):
data = MessageToDict(
self._proto.FromString(b),
preserving_proto_field_name=True,
including_default_value_fields=True,
)
# remove the key object from the unserialized message
data.pop("key", None)
return data
def _dumps(self, o):
# for deletes
if not o:
return None
obj = self._proto()
# add the key object to them message before serializing
if hasattr(obj, "PK"):
for k in obj.PK.DESCRIPTOR.fields_by_name.keys():
if k not in o:
raise Exception(
"Invalid object `{}` for proto `{}`".format(o, self._proto)
)
setattr(obj.key, k, o[k])
for k, v in o.items():
if hasattr(obj, k):
setattr(obj, k, v)
else:
ghost.debug(
"Invalid value-attribute `%s` for proto `%s`", k, self._proto
)
return obj.SerializeToString()
class ProtoValue(ProtoSerializer):
def only_key(self):
return False
class ProtoKey(ProtoSerializer):
def only_key(self):
return True
然后按如下方式使用它:
import faust
from utils.serializer import ProtoKey, ProtoValue
app = faust.App(
'faust-consumer',
broker='kafka://',
store="memory://",
cache="memory://",
)
topic = app.topic(
'topic',
key_serializer=ProtoKey().as_proto('topic'),
value_serializer=ProtoValue().as_proto('topic')
)
@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)
if __name__ == "__main__":
app.main()