0

是否可以在 faust rpc 回复中访问 kafka 标头?这是两个浮士德代理的示例。一个 (the pow) 调用另一个 ( mul) 并接收一个结果作为值。但是如何知道reply topic中的kafka headers呢?

#!/usr/bin/env python
from typing import AsyncIterable
import faust
from faust import StreamT

app = faust.App('RPC99', reply_create_topic=True)
pow_topic = app.topic('RPC__pow')
mul_topic = app.topic('RPC__mul')

@app.agent(pow_topic)
async def pow(stream: StreamT[float]) -> AsyncIterable[float]:
    async for value in stream:
        yield await mul.ask(value=value ** 2)
        # Headers for the returning result here?

@app.agent(mul_topic)
async def mul(stream: StreamT[float]) -> AsyncIterable[float]:
    async for value in stream:
        yield value * 100.0
4

2 回答 2

1

您可以从以下接收到的事件中获取标头:

流事件()

@app.agent(topic)
async def iterrate(stream):
    async for event in stream.events():
        value = event.value
        offset = event.message.offset
        headers = event.headers

faust.types.events 的定义:

https://faust.readthedocs.io/en/latest/reference/faust.types.events.html#faust.types.events.EventT

于 2021-02-17T15:00:34.983 回答
0

似乎流事件具有headerstype的属性Union[List[Tuple[str, bytes]]

文档在这里有,但目前在这一点上并没有非常详细。

看起来你可以做类似的事情

@app.agent(topic)
async def process(stream):
    async for value in stream:
        do_something(value.headers)

如果您已经反序列化了消息,看起来您还可以通过流 apifaust.streams.current_event()访问原始事件。

编辑以回应Роман的评论:

看起来您必须显式发送消息才能修改标头。查看通道 apisend()中over的函数签名。我认为或者该类中发送的其他变体之一应该是您正在寻找的。send

于 2019-12-03T15:16:59.630 回答