0

我编写的代码允许我在处理前一个数据块的同时开始从 API 获取下一个数据块。

我希望这始终在任何给定时刻同时获取多达5 个块,但是即使队列中最后一个请求在任何其他请求之前完成,返回的数据也应始终以正确的顺序处理。

如何更改我的代码以实现这一点?

class MyClient:
    async def fetch_entities(
        self,
        entity_ids:List[int],
        objects:Optional[List[str]],
        select_inbound:Optional[List[str]]=None,
        select_outbound:Optional[List[str]]=None,
        queue_size:int=5,
        chunk_size:int=500,
    ):
        """
        Fetch entities in chunks

        While one chunk of data is being processed the next one can
        already be fetched. In other words: Data processing does not
        block data fetching.
        """
        objects = ",".join(objects)
        if select_inbound:
            select_inbound = ",".join(select_inbound)

        if select_outbound:
            select_outbound = ",".join(select_outbound)

        queue = asyncio.Queue(maxsize=queue_size)

        # TODO: I want to be able to fill the queue with requests that are already executing

        async def queued_chunks():
            for ids in chunks(entity_ids, chunk_size):
                res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                    "entityIds": ids,
                    "objects": objects,
                    "inbound": {
                        "linkTypeIds": select_outbound,
                        "objects": objects,
                    } if select_inbound else {},
                    "outbound": {
                        "linkTypeIds": select_inbound,
                        "objects": objects,
                    } if select_outbound else {},
                })
                await queue.put(res)
            await queue.put(None)

        asyncio.create_task(queued_chunks())

        while True:
            res = await queue.get()
            if res is None:
                break
            res.raise_for_status()
            queue.task_done()
            for entity in res.json():
                yield entity
4

2 回答 2

2

与其在入队之前等待协程,不如将协程入队并稍后等待

class MyClient:
async def fetch_entities(
    self,
    entity_ids:List[int],
    objects:Optional[List[str]],
    select_inbound:Optional[List[str]]=None,
    select_outbound:Optional[List[str]]=None,
    queue_size:int=5,
    chunk_size:int=500,
):
    """
    Fetch entities in chunks

    While one chunk of data is being processed the next one can
    already be fetched. In other words: Data processing does not
    block data fetching.
    """
    objects = ",".join(objects)
    if select_inbound:
        select_inbound = ",".join(select_inbound)

    if select_outbound:
        select_outbound = ",".join(select_outbound)

    queue = asyncio.Queue(maxsize=queue_size)

    # TODO: I want to be able to fill the queue with requests that are already executing

    async def queued_chunks():
        for ids in chunks(entity_ids, chunk_size):
            cor = self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                "entityIds": ids,
                "objects": objects,
                "inbound": {
                    "linkTypeIds": select_outbound,
                    "objects": objects,
                } if select_inbound else {},
                "outbound": {
                    "linkTypeIds": select_inbound,
                    "objects": objects,
                } if select_outbound else {},
            })
            task = asyncio.create_task(cor)
            await queue.put(cor)
        await queue.put(None)

    asyncio.create_task(queued_chunks())

    while True:
        task = await queue.get()
        if task is None:
            break
        res = await task
        res.raise_for_status()
        queue.task_done()
        for entity in res.json():
            yield entity
于 2021-01-13T23:29:02.387 回答
1

我会在这里使用两个队列:一个用于处理块,一个用于已完成的块。您可以有任意数量的工作任务来处理块,并且可以对第一个队列设置大小限制以限制您预取的块数量。仅使用一个循环来接收已处理的块,以确保它们保持有序(您的代码已经这样做了)。

诀窍是将期货放入两个队列中,每个队列一个要处理的块。执行处理的工作任务获取块和未来对,然后需要通过将 POST 响应设置为这些未来的结果来解析关联的未来。处理已处理块的循环在每个未来等待,因此只有在当前块已完全处理后才会继续执行下一个块。为此,您需要将块相应的未来都放入第一个队列以供工作人员处理。将同一个future放入第二个队列;这些强制块结果按顺序处理。

所以,总结一下:

  • 有两个队列:
    1. chunks持有(chunk, future)对象。
    2. completed持有期货,*与另一个队列中的块配对的相同期货。
  • 创建从块队列中消耗的“工作者”任务。如果创建 5 个,则将并行处理 5 个块。每次工人完成处理时,他们都会将结果设置为相应的未来。
  • 使用“已处理的块”循环;它从队列中获取下一个未来completed并等待它。只有当与该未来相关的特定块已经完成时,它才会产生结果(由工作任务设置)。

作为一个粗略的草图,它看起来像这样:

chunk_queue = asyncio.Queue()
completed_queue = asyncio.Queue()
WORKER_COUNT = queue_size

async def queued_chunks():
    for ids in chunks(entity_ids, chunk_size):
        future = asyncio.Future()
        await chunk_queue.put((ids, future))
        await completed_queue.put(future)
    completed_queue.put(None)

async def worker():
    while True:
        ids, future = chunk_queue.get()
        try:
            res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                "entityIds": ids,
                "objects": objects,
                "inbound": {
                    "linkTypeIds": select_outbound,
                    "objects": objects,
                } if select_inbound else {},
                "outbound": {
                    "linkTypeIds": select_inbound,
                    "objects": objects,
                } if select_outbound else {},
            })
            res.raise_for_status()
            future.set_result(res)
        except Exception as e:
            future.set_exception(e)
            return

workers = [asyncio.create_task(worker) for _ in range(WORKER_COUNT)]
chunk_producer = asyncio.create_task(queued_chunks())

try:
    while True:
        future = await completed_queue.get()
        if future is None:
            # all chunks have been processed!
            break
        res = await future
        yield from res.json()

finally:
    for w in workers:
        w.cancel()
    asyncio.wait(workers)

如果您必须限制排队的块数(而不仅仅是同时处理的块数),maxsize请在chunk_queue队列上设置(设置为大于 的值WORKER_COUNT)。例如,使用它来限制内存要求。

但是,如果您将 设置为等于maxsize值,您不妨完全摆脱工作任务,而是将工作循环的主体作为包裹在任务中的协程放入已完成的结果队列中。asyncio类是 的子类,当它包装的协程完成时,它会自动设置未来的结果。如果您不打算将比工作任务更多的块放入其中,那么您最好切断中间人,完全放弃。然后任务进入完成队列而不是普通的期货:WORKER_COUNTTaskFuturechunk_queuechunk_queue

completed_queue = asyncio.Queue(maxsize=queue_size)

async def queued_chunks():
    for ids in chunks(entity_ids, chunk_size):
        task = asyncio.create_task(fetch_task(ids))
        await completed_queue.put(task)
    completed_queue.put(None)

async def fetch_task(ids):
    res = await self.client.post(urllib.parse.quote("entities:fetchdata"),
        json={
            "entityIds": ids,
            "objects": objects,
            "inbound": {
                "linkTypeIds": select_outbound,
                "objects": objects,
            } if select_inbound else {},
            "outbound": {
                "linkTypeIds": select_inbound,
                "objects": objects,
            } if select_outbound else {},
        }
    )
    res.raise_for_status()
    return res

chunk_producer = asyncio.create_task(queued_chunks())

while True:
    task = await completed_queue.get()
    if task is None:
        # all chunks have been processed!
        break
    res = await task
    yield from task.json()

这个版本非常接近您已经拥有的版本,唯一的区别是我们将客户端 POST 协程的等待和响应状态代码的检查放入单独的协程中作为任务运行。您还可以将self.client.post()协程放入任务中(因此不要等待它)并将响应状态检查留给最终的队列处理循环。这就是巴勃罗的回答所提出的,所以我不会在这里重复。

请注意,此版本在将任务放入队列之前启动任务。队列不是活动任务数量的唯一限制,还有一个已经启动的任务在一端等待空间放入队列(await completed_queue.put(task)如果队列已满,则行阻塞),另一个任务已经占用由队列消费者输出(由 获取task = await completed_queue.get())。如果需要限制活动任务的数量,可以从队列maxsize中减去2来设置一个上限。

此外,由于任务可能同时完成,活动任务可能会maxsize + 1 减少 ,但在队列中释放更多空间之前,您无法再开始。因为第一种方法对任务的输入进行排队,所以没有这些问题。您可以通过使用信号量而不是绑定队列大小来限制任务(在开始任务之前获取一个槽,并在从任务返回之前释放一个槽)来缓解这个问题。

就我个人而言,我会选择我的第一个建议,因为它可以让您单独控制并发和块预取,而没有第二种方法的问题。

于 2021-01-17T20:58:50.613 回答