0

我正在尝试将消息转发到浮士德的内部主题。正如本例中 faust 所建议的:

https://faust.readthedocs.io/en/latest/playbooks/vskafka.html

我有以下代码

in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)

....

@app.agent(in_topic)
async def process(stream):
    async for event in stream:
        event.forward(batching)
        yield

但是我在运行我的 pytest 时总是收到以下错误:

 AttributeError: 'str' object has no attribute 'forward'

是否删除了此功能,或者我是否需要以不同的方式指定主题以获取事件,或者这甚至是 pytest 的问题?

4

3 回答 3

1

您正在向后使用语法,这就是为什么!

in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)

....

@app.agent(in_topic)
async def process(stream):
    async for event in stream:
        batching.send(value=event)
        yield

应该实际工作。

编辑:

这也是我可以正确使用 pytest 和 faust 的唯一他们。
sink 方法只有在你删除了 mocked agent 的最后一个 sink 之外的所有东西时才可用,这看起来并不直观。

如果您首先导入您的接收器,然后将此装饰器添加到您的测试中,一切都应该正常工作:

from my_path import my_sink, my_agent
from unittest.mock import patch, AsyncMock

@patch(__name__ + '.my_sink.send', new_callable=AsyncMock)
def test_my_agent(test_app)
    async with my_agent.test_context() as agent:
        payload = 'This_works_fine'
        agent.put(payload)
        my_sink.send.assert_called_with(value=payload)
于 2021-03-31T06:35:11.530 回答
0

答案@Florian Hall很好。我还发现 forward 方法仅适用于事件,在我的情况下,我收到了一个 str。原因可能是浮士德主题和频道之间的差异。另一件事是使用 pytest 和 Faust 的test_context()行为很奇怪,例如,即使您没有定义接收器,它也会迫使您屈服。

于 2021-04-01T07:56:42.150 回答
0

尝试下一个:

@app.agent(in_topic, sink=[batching])
async def process(stream):
    async for event in stream:
        yield event
于 2021-03-03T19:58:46.060 回答