我正在尝试使用 Faust 将数据从 Kafka 中的一个主题发送到另一个主题。如果原始主题中的值为 None (消息是墓碑),我将带有 None 值的当前键发送到目标主题。
async def order_delete(key, target_topic):
await target_topic.send(key=key, value=None, headers={'__op': b'd'})
@app.agent(topic)
async def order_info(orders):
async for key, order in orders.items():
if order is None:
for target_topic in target_topics.values():
await order_delete(key, target_topic)
continue
我希望它成为目标主题中的墓碑,但事实并非如此。它有一个删除标题和以下值:
"ERROR" :{ "message":"src 属性必须是有效的 json 对象" }
我是浮士德的新手,所以我可能遗漏了一些东西......有没有办法用它发送墓碑?