6

使用 python 3.4 中的新 asyncio,我如何从一组锁/信号量中获取第一个可用的锁/信号量?

我所做的方法是使用wait(return_when=FIRST_COMPLETED),然后在我设法获得一个后取消所有acquire()仍待处理的 s。但我担心这可能会导致微妙的错误/竞争条件,我觉得有一种更优雅的方式来做到这一点。

import asyncio as aio

@aio.coroutine
def run():
    sem1, sem2 = (aio.Semaphore(), aio.Semaphore())
    print('initial:', sem1, sem2)
    a = aio.async(sleep(sem1, 1)) # acquire sem1
    print('just after sleep:', sem1, sem2)
    done, pending = yield from aio.wait([sem1.acquire(), sem2.acquire()], return_when=aio.FIRST_COMPLETED)
    print('done:', done)
    print('pending:', pending)
    for task in pending:
        task.cancel()
    print('after cancel:', sem1, sem2)
    yield from aio.wait([a])
    print('after wait:', sem1, sem2)

@aio.coroutine
def sleep(sem, i):
    with (yield from sem):
        yield from aio.sleep(i)

if __name__ == "__main__":
    aio.get_event_loop().run_until_complete(run())

上面的代码给出(内存地址已编辑):

initial: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [unlocked,value:1]>
just after sleep: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [unlocked,value:1]>
done: {Task(<acquire>)<result=True>}
pending: {Task(<acquire>)<PENDING>}
after cancel: <asyncio.locks.Semaphore object at 0x1 [locked,waiters:1]> <asyncio.locks.Semaphore object at 0x2 [locked]>
after wait: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [locked]>
4

1 回答 1

4

如果我正确理解您的问题,您希望拥有两个不同的锁池,一个允许每个代理连接 X 个,另一个允许 Y 个全局连接。单个对象实际上可以很容易地Semaphore用于此目的:

类 asyncio.Semaphore(value=1, *, loop=None)

信号量管理一个内部计数器,该计数器由每个 acquire() 调用递减,并由每个 release() 调用递增。计数器永远不会低于零;当acquire() 发现它为零时,它会阻塞,等待其他线程调用release()。

因此,与其使用Semaphore每个初始化为默认值value1 的对象列表来实现池,不如将value单个对象初始化Semaphore为您希望能够同时运行的最大任务数。

proxy_sem = Semaphore(value=5) # 5 connections will be able to hold this semaphore concurrently
global_sem = Semaphore(value=15) # 15 connections will be able to hold this semaphore

然后在您的代码中,始终在获取全局信号量之前获取代理信号量

with (yield from proxy_sem):
     with (yield from global_sem):

这样,当您等待特定于代理的锁时,您将不会持有全局锁,这可能会阻止来自另一个代理的连接,如果该代理可以获得全局锁,则该代理可以自由运行。

编辑:

这是一个完整的示例,它演示了一种完全不需要代理特定锁的方法。相反,您为每个代理运行一个协程,所有这些都从同一个队列中消耗。代理协程通过跟踪它们已启动的活动任务来限制它们运行的​​并发任务的数量,并且仅在它们低于限制时启动新任务。当代理协程启动任务时,该任务负责获取全局信号量。这是代码:

import asyncio
import random

PROXY_CONN_LIMIT = 5
GLOBAL_CONN_LIMIT = 20
PROXIES = ['1.2.3.4', '1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4']

@asyncio.coroutine
def do_network_stuff(item, proxy_info):
    print("Got {}. Handling it with proxy {}".format(item, proxy_info))
    # Wait a random amount of time to simulate actual work being done.
    yield from asyncio.sleep(random.randint(1,7))

@asyncio.coroutine
def handle_item(item, proxy_info, global_sem):
    with (yield from global_sem):  # Get the global semaphore
       yield from do_network_stuff(item, proxy_info)

@asyncio.coroutine
def proxy_pool(proxy_info, queue, global_sem):
    tasks = []
    def remove_item(task, *args):
        tasks.remove(task)
    while True:  # Loop infinitely. We'll return when we get a sentinel from main()
        while len(tasks) < PROXY_CONN_LIMIT: # Pull from the queue until we hit our proxy limit
            item = yield from queue.get()
            print(len(tasks))
            if item is None:  # Time to shut down
                if tasks:
                    # Make sure all pending tasks are finished first.
                    yield from asyncio.wait(tasks)
                print("Shutting down {}".format(proxy_info))
                return
            # Create a task for the work item, and add it to our list of
            # tasks.
            task = asyncio.async(handle_item(item, proxy_info, global_sem))
            tasks.append(task)
        # We've hit our proxy limit. Now we wait for at least one task
        # to complete, then loop around to pull more from the queue.
        done, pending = yield from asyncio.wait(tasks,
                                                return_when=asyncio.FIRST_COMPLETED) 
        # Remove the completed tasks from the active tasks list.
        for d in done:
            tasks.remove(d)

@asyncio.coroutine
def main():
    global_sem = asyncio.Semaphore(GLOBAL_CONN_LIMIT)
    queue = asyncio.Queue()
    tasks = []
    # Start the proxy pools.
    for proxy in PROXIES:
        tasks.append(asyncio.async(proxy_pool(proxy, queue, global_sem)))

    # Send work to the proxy pools.
    for i in range(50):
        yield from queue.put(i)

    # Tell the proxy pools to shut down.
    for _ in PROXIES:
        yield from queue.put(None)

    # Wait for them to shut down.
    yield from asyncio.wait(tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

样本输出:

0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
Got 0. Handling it with proxy 1.2.3.4
Got 1. Handling it with proxy 1.2.3.4
Got 2. Handling it with proxy 1.2.3.4
Got 3. Handling it with proxy 1.2.3.4
Got 4. Handling it with proxy 1.2.3.4
Got 5. Handling it with proxy 1.1.1.1
Got 6. Handling it with proxy 1.1.1.1
Got 7. Handling it with proxy 1.1.1.1
Got 8. Handling it with proxy 1.1.1.1
Got 9. Handling it with proxy 1.1.1.1
Got 10. Handling it with proxy 2.2.2.2
Got 11. Handling it with proxy 2.2.2.2
Got 12. Handling it with proxy 2.2.2.2
Got 13. Handling it with proxy 2.2.2.2
Got 14. Handling it with proxy 2.2.2.2
Got 15. Handling it with proxy 3.3.3.3
Got 16. Handling it with proxy 3.3.3.3
Got 17. Handling it with proxy 3.3.3.3
Got 18. Handling it with proxy 3.3.3.3
Got 19. Handling it with proxy 3.3.3.3
Got 20. Handling it with proxy 4.4.4.4
Got 21. Handling it with proxy 4.4.4.4
Got 22. Handling it with proxy 4.4.4.4
Got 23. Handling it with proxy 4.4.4.4
4
4
4
4
Got 24. Handling it with proxy 4.4.4.4
Got 25. Handling it with proxy 1.2.3.4
Got 26. Handling it with proxy 2.2.2.2
Got 27. Handling it with proxy 1.1.1.1
Got 28. Handling it with proxy 3.3.3.3
3
4
4
4
4
4
Got 29. Handling it with proxy 4.4.4.4
Got 30. Handling it with proxy 4.4.4.4
Got 31. Handling it with proxy 2.2.2.2
Got 32. Handling it with proxy 1.1.1.1
4
4
4
Got 33. Handling it with proxy 1.2.3.4
Got 34. Handling it with proxy 3.3.3.3
Got 35. Handling it with proxy 1.1.1.1
Got 36. Handling it with proxy 2.2.2.2
Got 37. Handling it with proxy 3.3.3.3
3
4
4
4
4
Got 38. Handling it with proxy 1.2.3.4
4
Got 39. Handling it with proxy 1.2.3.4
Got 40. Handling it with proxy 2.2.2.2
Got 41. Handling it with proxy 1.1.1.1
Got 42. Handling it with proxy 3.3.3.3
Got 43. Handling it with proxy 4.4.4.4
2
3
4
4
4
4
Got 44. Handling it with proxy 1.2.3.4
Got 45. Handling it with proxy 1.2.3.4
Got 46. Handling it with proxy 1.2.3.4
Got 47. Handling it with proxy 1.1.1.1
Got 48. Handling it with proxy 4.4.4.4
Got 49. Handling it with proxy 2.2.2.2
3
4
4
4
Shutting down 3.3.3.3
4
Shutting down 2.2.2.2
Shutting down 1.1.1.1
Shutting down 4.4.4.4
Shutting down 1.2.3.4
于 2014-08-17T20:13:11.653 回答