11

我很好奇您应该如何表达您希望将消息传递到浮士德的 Kafka 主题。他们自述文件中的示例似乎没有写入主题:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

我希望hello.send在上面的代码中向主题发布消息,但似乎没有。

有许多阅读主题的示例,以及许多使用 cli 推送临时消息的示例。梳理完文档后,我没有看到任何明确的在代码中发布到主题的示例。我只是疯了,上面的代码应该可以工作吗?

4

5 回答 5

5

您可以使用它sink来告诉 Faust 将代理函数的结果传递到何处。如果需要,您还可以一次将多个主题用作接收器。

@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result
于 2019-11-21T19:01:36.163 回答
5

send()函数是调用写入主题的正确函数。您甚至可以指定一个特定的分区,就像等效的 Java API 调用一样。

以下是该send()方法的参考:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send

于 2020-01-22T18:03:41.757 回答
1

如果您只想要一个 Faust 生产者(不与消费者/接收器结合),那么原始问题实际上有正确的代码,这是一个功能齐全的脚本,可将消息发布到任何 Kafka 都可以使用的“faust_test”Kafka 主题/浮士德消费者。

像这样运行下面的代码:python faust_producer.py worker

"""Simple Faust Producer"""
import faust

if __name__ == '__main__':
    """Simple Faust Producer"""

    # Create the Faust App
    app = faust.App('faust_test_app', broker='localhost:9092')
    topic = app.topic('faust_test')

    # Send messages
    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value='my message')

    # Start the Faust App
    app.main()
于 2020-12-04T02:13:55.617 回答
0

不知道这有多相关,但我在尝试学习浮士德时遇到了这个问题。从我读到的,这是正在发生的事情:

topic = app.topic('hello-topic', value_type=Greeting)

这里的误解是您创建的主题是您尝试消费/阅读的主题。您当前创建的主题不执行任何操作。

await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

这实质上创建了一个中间 kstream,它将值发送到您的 hello(greetings) 函数。def hello(...) 将在流中有新消息时调用,并将处理正在发送的消息。

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

这是从 hello.send(...) 接收 kafka 流并将其打印到控制台(没有输出到创建的“主题”)。您可以在此处向新主题发送消息。因此,您可以执行以下操作,而不是打印:

topic.send(value = "my message!")

或者:

这是你正在做的事情:

  1. example_sender() 向 hello(...) 发送消息(通过中间 kstream)
  2. hello(...) 获取消息并打印它 注意:没有将消息发送到正确的主题

这是您可以执行的操作:

  1. example_sender() 向 hello(...) 发送消息(通过中间 kstream)

  2. hello(...) 接收消息并打印

  3. hello(...) 还向创建的主题发送一条新消息(假设您正在尝试转换原始数据)

     app = faust.App('hello-app', broker='kafka://localhost')
     topic = app.topic('hello-topic', value_type=Greeting)
     output_topic = app.topic('test_output_faust', value_type=str)
    
     @app.agent(topic)
     async def hello(greetings):
         async for greeting in greetings:
             new_message = f'Hello from {greeting.from_name} to {greeting.to_name}'
             print(new_message)
             await output_topic.send(value=new_message)
    
于 2021-05-17T05:14:56.097 回答
0

所以我们只是遇到了需要向主题以外的sink主题发送消息。

我们找到的最简单的方法是:foo = await my_topic.send_soon(value="wtfm8").

您也可以send像下面使用 asyncio 事件循环直接使用。

loop = asyncio.get_event_loop()
foo = await ttopic.send(value="wtfm8??")
loop.run_until_complete(foo)
于 2020-11-17T17:02:14.987 回答