0

我有简单的浮士德代理。它使用来自 kafka 主题的 json,并默认将它们解析为 dicts faust 序列化器:

@app.agent(source_topic, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_some_stuff(record)
        yield result

反序列化本身发生在我的代码之外的某个地方,它由 faust 框架管理,而不是由我管理。如何捕获和处理反序列化异常,例如在无效 json 的情况下?

4

1 回答 1

0

您可以手动序列化数据并捕获错误:

topic = app.topic('custom', value_type=bytes)

@app.agent
async def processor(stream):
    async for payload in stream:
        data = json.loads(payload)

来源:https ://faust.readthedocs.io/en/latest/userguide/models.html

于 2021-05-03T13:11:41.030 回答