0

我正在与 trio 一起运行异步并发任务,该任务将在不同的网站上进行一些网络抓取。我希望能够选择与多少并发工作人员一起分配任务。为此,我编写了这段代码

async def run_task():
    s = trio.Session(connections=5)
    Total_to_check = to_check() / int(module().workers)
    line = 0
    if int(Total_to_check) < 1:
        Total_to_check = 1
        module().workers = int(to_check())
    for i in range(int(Total_to_check)):
        try:
            async with trio.open_nursery() as nursery:
                for x in range(int(module().workers)):                  
                        nursery.start_soon(python_worker, self, s, x, line)
                        line += 1
                            
    
        except BlockingIOError as e:
            print("[Fatal Error]", str(e))
            continue            

在此示例to_check()中,等于提供了多少个 url 来从中获取数据,并且module().workers等于我想使用多少个并发工作人员。

因此,如果我假设我有 30 个 url,并且我输入我想要 10 个并发任务,它将同时从 10 个 url 获取数据并重复该过程 3 次。

现在这一切都很好,直到我Total_to_check(等于 url 的数量除以工人的数量)是小数。如果我假设 15 个 url 并且我要求 10 个工人,那么这段代码只会检查 10 个 url。如果我有 20 个网址但要求 15 名工人也是如此。我可以做类似 math.ceil(Total_to_check) 的事情,但它会开始尝试检查不存在的网址。

我怎样才能使它正常工作,所以如果我有 10 个并发任务和 15 个 url,它将同时检查前 10 个,然后同时检查最后 5 个而不跳过 url?(或试图检查太多)

谢谢!

4

1 回答 1

2

好吧,你可以像这样使用CapacityLimiter :

async def python_worker(self, session, workers, line, limit):
    async with limit:
        ...

然后你可以简化你的run_task

async def run_task():
    limit = trio.CapacityLimiter(10)
    s = trio.Session(connections=5)
    line = 0
    async with trio.open_nursery() as nursery:
        for x in range(int(to_check())):
            nursery.start_soon(python_worker, self, s, x, line, limit)
            line += 1      

我相信 theBlockingIOError也必须移动到里面python_worker,因为nursery.start_soon()它不会阻塞,它会自动__aexit__等待块的末尾。nurseryasync with trio.open_nursery() as nursery

于 2019-06-21T07:29:20.750 回答