0

我正在尝试使用 Faust 将数据从 Kafka 中的一个主题发送到另一个主题。如果原始主题中的值为 None (消息是墓碑),我将带有 None 值的当前键发送到目标主题。

async def order_delete(key, target_topic):
    await target_topic.send(key=key, value=None, headers={'__op': b'd'})
                                                       

@app.agent(topic)
async def order_info(orders):
    async for key, order in orders.items():

        if order is None:
            for target_topic in target_topics.values():
                await order_delete(key, target_topic)
            continue

我希望它成为目标主题中的墓碑,但事实并非如此。它有一个删除标题和以下值:

"ERROR" :{ "message":"src 属性必须是有效的 json 对象" }

我是浮士德的新手,所以我可能遗漏了一些东西......有没有办法用它发送墓碑?

4

2 回答 2

0

看起来您遇到了与此处标记的问题类似的问题 https://github.com/robinhood/faust/issues/143

于 2022-02-18T15:03:36.620 回答
0

对我来说最简单的解决方案看起来像发送空字典{}。它仍然是有效的 json。

于 2022-03-03T14:16:58.883 回答