0

Currently, I'm trying to read the data from kafka topic and call the rest-API asynchronously with data that I fetched from kafka topic. here rest-api gives response instantly if msg is Meher else response will takes 5 sec

kafka-data

Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher

below is the code:

app = faust.App(
    'faustApp',
    broker="kafka://localhost:9092",
    value_serializer='raw',
)

app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
    async for article in articles:
        val = article.decode('utf-8')
        url = 'http://0.0.0.0:5050/' + val
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
            print(data)
if __name__ == '__main__':
    app.main()

current-output:

Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!

expected-output:

Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!

expected is to get the responses for all the rest calls with an instant response first and late response should come after that, but currently it is working as sequentially.

If I increase the concurrency to 5, It is giving expected output, but should work with the same in case of concurrency 1. Not sure, if i'm missing something....any help on this ?

update1:

I have tried the samething with normal python asyncIO..It is working as expected

import asyncio
import aiofiles
import aiohttp

async def get_player(player_name):
    url = 'http://0.0.0.0:5050/' + player_name
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
    print(data)


loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
    asyncio.gather(
        *(get_player(args) for args in player_args)
    )
)
4

1 回答 1

0

从浮士德文档https://faust.readthedocs.io/en/latest/userguide/agents.html#id5 看来,每个代理实例一次处理流的一个元素。流迭代不会在其可用元素上并行化,但单个代理实例将按顺序一个接一个地处理流元素。

如果在处理流的元素时等待某些内容,代理实例将不会移动到下一个元素(如果可用),直到该元素的处理完成。等待操作不会“解锁”代理,将其移动到下一个流元素,然后在第一次等待完成后恢复对第一个元素的处理。

另一方面,如果您设置 concurrency=5,您将有 5 个实例可以从流中获取项目并同时并行处理它们。

Asyncio.gather 之所以有效,是因为协程被包装到任务中并一起同时运行,等待它们的结果。

于 2021-04-04T16:54:43.467 回答