我编写的代码允许我在处理前一个数据块的同时开始从 API 获取下一个数据块。
我希望这始终在任何给定时刻同时获取多达5 个块,但是即使队列中最后一个请求在任何其他请求之前完成,返回的数据也应始终以正确的顺序处理。
如何更改我的代码以实现这一点?
class MyClient:
async def fetch_entities(
self,
entity_ids:List[int],
objects:Optional[List[str]],
select_inbound:Optional[List[str]]=None,
select_outbound:Optional[List[str]]=None,
queue_size:int=5,
chunk_size:int=500,
):
"""
Fetch entities in chunks
While one chunk of data is being processed the next one can
already be fetched. In other words: Data processing does not
block data fetching.
"""
objects = ",".join(objects)
if select_inbound:
select_inbound = ",".join(select_inbound)
if select_outbound:
select_outbound = ",".join(select_outbound)
queue = asyncio.Queue(maxsize=queue_size)
# TODO: I want to be able to fill the queue with requests that are already executing
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
await queue.put(res)
await queue.put(None)
asyncio.create_task(queued_chunks())
while True:
res = await queue.get()
if res is None:
break
res.raise_for_status()
queue.task_done()
for entity in res.json():
yield entity