Currently, I'm trying to read the data from kafka topic and call the rest-API asynchronously with data that I fetched from kafka topic. here rest-api gives response instantly if msg is Meher else response will takes 5 sec
kafka-data
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
below is the code:
app = faust.App(
'faustApp',
broker="kafka://localhost:9092",
value_serializer='raw',
)
app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
async for article in articles:
val = article.decode('utf-8')
url = 'http://0.0.0.0:5050/' + val
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(data)
if __name__ == '__main__':
app.main()
current-output:
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
expected-output:
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
expected is to get the responses for all the rest calls with an instant response first and late response should come after that, but currently it is working as sequentially.
If I increase the concurrency to 5, It is giving expected output, but should work with the same in case of concurrency 1. Not sure, if i'm missing something....any help on this ?
update1:
I have tried the samething with normal python asyncIO..It is working as expected
import asyncio
import aiofiles
import aiohttp
async def get_player(player_name):
url = 'http://0.0.0.0:5050/' + player_name
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.text()
print(data)
loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
asyncio.gather(
*(get_player(args) for args in player_args)
)
)