如果我正确理解您的问题,您希望拥有两个不同的锁池,一个允许每个代理连接 X 个,另一个允许 Y 个全局连接。单个对象实际上可以很容易地Semaphore
用于此目的:
类 asyncio.Semaphore(value=1, *, loop=None)
信号量管理一个内部计数器,该计数器由每个 acquire() 调用递减,并由每个 release() 调用递增。计数器永远不会低于零;当acquire() 发现它为零时,它会阻塞,等待其他线程调用release()。
因此,与其使用Semaphore
每个初始化为默认值value
1 的对象列表来实现池,不如将value
单个对象初始化Semaphore
为您希望能够同时运行的最大任务数。
proxy_sem = Semaphore(value=5) # 5 connections will be able to hold this semaphore concurrently
global_sem = Semaphore(value=15) # 15 connections will be able to hold this semaphore
然后在您的代码中,始终在获取全局信号量之前获取代理信号量
with (yield from proxy_sem):
with (yield from global_sem):
这样,当您等待特定于代理的锁时,您将不会持有全局锁,这可能会阻止来自另一个代理的连接,如果该代理可以获得全局锁,则该代理可以自由运行。
编辑:
这是一个完整的示例,它演示了一种完全不需要代理特定锁的方法。相反,您为每个代理运行一个协程,所有这些都从同一个队列中消耗。代理协程通过跟踪它们已启动的活动任务来限制它们运行的并发任务的数量,并且仅在它们低于限制时启动新任务。当代理协程启动任务时,该任务负责获取全局信号量。这是代码:
import asyncio
import random
PROXY_CONN_LIMIT = 5
GLOBAL_CONN_LIMIT = 20
PROXIES = ['1.2.3.4', '1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4']
@asyncio.coroutine
def do_network_stuff(item, proxy_info):
print("Got {}. Handling it with proxy {}".format(item, proxy_info))
# Wait a random amount of time to simulate actual work being done.
yield from asyncio.sleep(random.randint(1,7))
@asyncio.coroutine
def handle_item(item, proxy_info, global_sem):
with (yield from global_sem): # Get the global semaphore
yield from do_network_stuff(item, proxy_info)
@asyncio.coroutine
def proxy_pool(proxy_info, queue, global_sem):
tasks = []
def remove_item(task, *args):
tasks.remove(task)
while True: # Loop infinitely. We'll return when we get a sentinel from main()
while len(tasks) < PROXY_CONN_LIMIT: # Pull from the queue until we hit our proxy limit
item = yield from queue.get()
print(len(tasks))
if item is None: # Time to shut down
if tasks:
# Make sure all pending tasks are finished first.
yield from asyncio.wait(tasks)
print("Shutting down {}".format(proxy_info))
return
# Create a task for the work item, and add it to our list of
# tasks.
task = asyncio.async(handle_item(item, proxy_info, global_sem))
tasks.append(task)
# We've hit our proxy limit. Now we wait for at least one task
# to complete, then loop around to pull more from the queue.
done, pending = yield from asyncio.wait(tasks,
return_when=asyncio.FIRST_COMPLETED)
# Remove the completed tasks from the active tasks list.
for d in done:
tasks.remove(d)
@asyncio.coroutine
def main():
global_sem = asyncio.Semaphore(GLOBAL_CONN_LIMIT)
queue = asyncio.Queue()
tasks = []
# Start the proxy pools.
for proxy in PROXIES:
tasks.append(asyncio.async(proxy_pool(proxy, queue, global_sem)))
# Send work to the proxy pools.
for i in range(50):
yield from queue.put(i)
# Tell the proxy pools to shut down.
for _ in PROXIES:
yield from queue.put(None)
# Wait for them to shut down.
yield from asyncio.wait(tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
样本输出:
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
Got 0. Handling it with proxy 1.2.3.4
Got 1. Handling it with proxy 1.2.3.4
Got 2. Handling it with proxy 1.2.3.4
Got 3. Handling it with proxy 1.2.3.4
Got 4. Handling it with proxy 1.2.3.4
Got 5. Handling it with proxy 1.1.1.1
Got 6. Handling it with proxy 1.1.1.1
Got 7. Handling it with proxy 1.1.1.1
Got 8. Handling it with proxy 1.1.1.1
Got 9. Handling it with proxy 1.1.1.1
Got 10. Handling it with proxy 2.2.2.2
Got 11. Handling it with proxy 2.2.2.2
Got 12. Handling it with proxy 2.2.2.2
Got 13. Handling it with proxy 2.2.2.2
Got 14. Handling it with proxy 2.2.2.2
Got 15. Handling it with proxy 3.3.3.3
Got 16. Handling it with proxy 3.3.3.3
Got 17. Handling it with proxy 3.3.3.3
Got 18. Handling it with proxy 3.3.3.3
Got 19. Handling it with proxy 3.3.3.3
Got 20. Handling it with proxy 4.4.4.4
Got 21. Handling it with proxy 4.4.4.4
Got 22. Handling it with proxy 4.4.4.4
Got 23. Handling it with proxy 4.4.4.4
4
4
4
4
Got 24. Handling it with proxy 4.4.4.4
Got 25. Handling it with proxy 1.2.3.4
Got 26. Handling it with proxy 2.2.2.2
Got 27. Handling it with proxy 1.1.1.1
Got 28. Handling it with proxy 3.3.3.3
3
4
4
4
4
4
Got 29. Handling it with proxy 4.4.4.4
Got 30. Handling it with proxy 4.4.4.4
Got 31. Handling it with proxy 2.2.2.2
Got 32. Handling it with proxy 1.1.1.1
4
4
4
Got 33. Handling it with proxy 1.2.3.4
Got 34. Handling it with proxy 3.3.3.3
Got 35. Handling it with proxy 1.1.1.1
Got 36. Handling it with proxy 2.2.2.2
Got 37. Handling it with proxy 3.3.3.3
3
4
4
4
4
Got 38. Handling it with proxy 1.2.3.4
4
Got 39. Handling it with proxy 1.2.3.4
Got 40. Handling it with proxy 2.2.2.2
Got 41. Handling it with proxy 1.1.1.1
Got 42. Handling it with proxy 3.3.3.3
Got 43. Handling it with proxy 4.4.4.4
2
3
4
4
4
4
Got 44. Handling it with proxy 1.2.3.4
Got 45. Handling it with proxy 1.2.3.4
Got 46. Handling it with proxy 1.2.3.4
Got 47. Handling it with proxy 1.1.1.1
Got 48. Handling it with proxy 4.4.4.4
Got 49. Handling it with proxy 2.2.2.2
3
4
4
4
Shutting down 3.3.3.3
4
Shutting down 2.2.2.2
Shutting down 1.1.1.1
Shutting down 4.4.4.4
Shutting down 1.2.3.4