2

以下是针对该问题简化的完整代码。

ids_to_check返回一个 id 列表。对于我的测试,我使用了 13 个随机字符串的列表。

#!/usr/bin/env python3
import time
from multiprocessing.dummy import Pool as ThreadPool, current_process as threadpool_process
import requests

def ids_to_check():
     some_calls()
     return(id_list)

def execute_task(id):
     url = f"https://myserver.com/todos/{ id }"
     json_op = s.get(url,verify=False).json()
     value = json_op['id']
     print(str(value) + '-' + str(threadpool_process()) + str(id(s)))

def main():
    pool = ThreadPool(processes=20)
    while True:
        pool.map(execute_task, ids_to_check())
        print("Let's wait for 10 seconds")
        time.sleep(10)

if __name__ == "__main__":
    s = requests.Session()
    s.headers.update = {
      'Accept': 'application/json'
    }

    main()

输出:

4-<DummyProcess(Thread-2, started daemon 140209222559488)>140209446508360
5-<DummyProcess(Thread-5, started daemon 140209123481344)>140209446508360
7-<DummyProcess(Thread-6, started daemon 140209115088640)>140209446508360
2-<DummyProcess(Thread-11, started daemon 140208527894272)>140209446508360
None-<DummyProcess(Thread-1, started daemon 140209230952192)>140209446508360
10-<DummyProcess(Thread-4, started daemon 140209131874048)>140209446508360
12-<DummyProcess(Thread-7, started daemon 140209106695936)>140209446508360
8-<DummyProcess(Thread-3, started daemon 140209140266752)>140209446508360
6-<DummyProcess(Thread-12, started daemon 140208519501568)>140209446508360
3-<DummyProcess(Thread-13, started daemon 140208511108864)>140209446508360
11-<DummyProcess(Thread-10, started daemon 140208536286976)>140209446508360
9-<DummyProcess(Thread-9, started daemon 140209089910528)>140209446508360
1-<DummyProcess(Thread-8, started daemon 140209098303232)>140209446508360
Let's wait for 10 seconds
None-<DummyProcess(Thread-14, started daemon 140208502716160)>140209446508360
3-<DummyProcess(Thread-20, started daemon 140208108455680)>140209446508360
1-<DummyProcess(Thread-19, started daemon 140208116848384)>140209446508360
7-<DummyProcess(Thread-17, started daemon 140208133633792)>140209446508360
6-<DummyProcess(Thread-6, started daemon 140209115088640)>140209446508360
4-<DummyProcess(Thread-4, started daemon 140209131874048)>140209446508360
9-<DummyProcess(Thread-16, started daemon 140208485930752)>140209446508360
5-<DummyProcess(Thread-15, started daemon 140208494323456)>140209446508360
2-<DummyProcess(Thread-2, started daemon 140209222559488)>140209446508360
8-<DummyProcess(Thread-18, started daemon 140208125241088)>140209446508360
11-<DummyProcess(Thread-1, started daemon 140209230952192)>140209446508360
10-<DummyProcess(Thread-11, started daemon 140208527894272)>140209446508360
12-<DummyProcess(Thread-5, started daemon 140209123481344)>140209446508360
Let's wait for 10 seconds
None-<DummyProcess(Thread-3, started daemon 140209140266752)>140209446508360
2-<DummyProcess(Thread-10, started daemon 140208536286976)>140209446508360
1-<DummyProcess(Thread-12, started daemon 140208519501568)>140209446508360
4-<DummyProcess(Thread-9, started daemon 140209089910528)>140209446508360
5-<DummyProcess(Thread-14, started daemon 140208502716160)>140209446508360
9-<DummyProcess(Thread-6, started daemon 140209115088640)>140209446508360
8-<DummyProcess(Thread-16, started daemon 140208485930752)>140209446508360
7-<DummyProcess(Thread-4, started daemon 140209131874048)>140209446508360
3-<DummyProcess(Thread-20, started daemon 140208108455680)>140209446508360
6-<DummyProcess(Thread-8, started daemon 140209098303232)>140209446508360
12-<DummyProcess(Thread-13, started daemon 140208511108864)>140209446508360
10-<DummyProcess(Thread-7, started daemon 140209106695936)>140209446508360
11-<DummyProcess(Thread-19, started daemon 140208116848384)>140209446508360
Let's wait for 10 seconds
.
.

我的观察:

  • 创建了多个连接(即每个进程的连接),但会话对象在整个代码执行过程中是相同的(因为会话对象 ID 相同)
  • 从 ss 输出看,连接保持循环。我无法确定回收的任何特定模式/超时
  • 如果我将进程减少到较小的数量,则连接不会被回收。(例:5)

我不明白连接是如何/为什么被回收的,以及如果我减少进程数为什么它们不是。我已经尝试禁用垃圾收集器import gc; gc.disable()并且仍然回收连接。

我希望创建的连接保持活动状态,直到达到最大请求数。我认为它可以在没有会话并使用 keep-alive 连接标头的情况下工作。

但是我很想知道是什么导致这些会话连接在进程池长度很高时保持回收。

我可以在任何服务器上重现此问题,因此它可能不依赖于服务器。

4

1 回答 1

1

我通过为每个进程创建会话和并行请求执行来解决了同样的问题。第一次我multiprocessing.dummy也用过,但我遇到了和你一样的问题,把它改成了concurrent.futures.thread.ThreadPoolExecutor.

这是我的解决方案。

from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial

from requests import Session, Response
from requests.adapters import HTTPAdapter

def thread_pool_execute(iterables, method, pool_size=30) -> list:
    """Multiprocess requests, returns list of responses."""
    session = Session()
    session.mount('https://', HTTPAdapter(pool_maxsize=pool_size))  # that's it
    session.mount('http://', HTTPAdapter(pool_maxsize=pool_size))  # that's it    
    worker = partial(method, session)
    with ThreadPoolExecutor(pool_size) as pool:
        results = pool.map(worker, iterables)
    session.close()
    return list(results)

def simple_request(session, url) -> Response:
    return session.get(url)

response_list = thread_pool_execute(list_of_urls, simple_request)

我用它测试了带有 200k url 的站点地图,pool_size=150没有任何问题。它仅受目标主机配置的限制。

于 2020-12-27T14:16:11.163 回答