ProcessPoolExecutor
优点是不受 GIL 影响。使用ThreadPoolExecutor
,GIL 将阻止一次运行多个线程,除非您正在执行 I/O。好消息是,看起来您的两个线程都将主要进行 I/O,但是在调用 Web 服务之前或之后在每个线程中发生的任何类型的处理都不会真正同时发生,这会损害您的性能。ProcessPoolExecutor
不会有这个限制,但它增加了在进程之间发送group_id
和host
数据的开销。如果您有数以万计的主机,那么在进程之间一次发送这些主机将产生相当大的开销。
我认为仅此更改不会对性能产生太大影响,因为最终您仍然一次将每个主机发送到一个线程进行处理。
至于数字 3,如果您的工作线程实际上除了 I/O 几乎什么都不做,那么这种方法可能工作得很好。但是对于线程,worker 中的任何 CPU 密集型工作都会扼杀你的性能。我采用了您的确切程序布局并像这样实现了您的两个工人:
def call_ws_1(group_id):
return list(range(20))
def call_ws_2(host):
sum(range(33000000)) # CPU-bound
#time.sleep(1) # I/O-bound
return "{} property".format(host)
并像这样执行一切:
if __name__ == "__main__":
start = time.time()
fetch_hosts(['a', 'b', 'c', 'd', 'e'])
end = time.time()
print("Total time: {}".format(end-start))
使用time.sleep
,输出为:
Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 25.051292896270752
使用sum(range(33000000))
计算,性能要差得多:
Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 75.81612730026245
请注意,在我的笔记本电脑上计算大约需要一秒钟:
>>> timeit.timeit("sum(range(33000000))", number=1)
1.023313045501709
>>> timeit.timeit("sum(range(33000000))", number=1)
1.029937982559204
所以每个工人大约需要一秒钟。但是因为一个受 CPU 限制,因此受 GIL 的影响,线程的性能非常糟糕。
这是一个ProcessPoolExecutor
使用time.sleep
:
Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 25.169482469558716
现在使用sum(range(33000000))
:
Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 43.54587936401367
如您所见,虽然性能仍然比线程版本差time.sleep
(可能是因为计算时间超过一秒,并且 CPU 密集型工作必须与笔记本电脑上运行的所有其他东西竞争),但它仍然大大优于线程版本。
但是,我怀疑随着主机数量的增加,IPC 的成本会让你慢很多。以下是对ThreadPoolExecutor
10000 个主机的处理方式,但工作进程什么都不做(它只是返回):
Fetching hosts for c
Fetching hosts for b
Fetching hosts for d
Fetching hosts for a
Fetching hosts for e
Total time: 9.535644769668579
比较ProcessPoolExecutor
:
Fetching hosts for c
Fetching hosts for b
Fetching hosts for a
Fetching hosts for d
Fetching hosts for e
Total time: 36.59257411956787
所以它的速度要慢 4 倍ProcessPoolExecutor
,这都是由 IPC 的成本引起的。
那么,这一切意味着什么?我认为您的最佳性能将来自使用ProcessPoolExecutor
,但另外批处理您的 IPC,以便您将大块主机发送到子进程中,而不是一次只发送一个主机。
像这样的东西(未经测试,但给你的想法):
import time
import itertools
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor as Pool
def call_ws_1(group_id):
return list(range(10000))
def call_ws_2(hosts): # This worker now works on a list of hosts
host_results = []
for host in hosts:
host_results.append(( host, "{} property".format(host))) # returns a list of (host, property) tuples
return host_results
def chunk_list(l):
chunksize = len(l) // 16 # Break the list into smaller pieces
it = [iter(l)] * chunksize
for item in itertools.zip_longest(*it):
yield tuple(filter(None, item))
def fetch_property(hosts):
with Pool(max_workers=4) as executor:
futs = []
for chunk in chunk_list(hosts):
futs.append(concurrent.futures.submit(call_ws_2, chunk))
for future in concurrent.futures.as_completed(futs):
try:
results = future.result()
except Exception as exp:
print("Got %s" % exp)
else:
for result in results:
host, property = result
# Save host and property to DB
def fetch_hosts(group_ids):
with Pool(max_workers=4) as executor:
future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
for future in concurrent.futures.as_completed(future_to_grp_id):
group_id = future_to_grp_id[future]
try:
hosts = future.result()#this is a list
except Exception as exp:
print("Got %s" % exp)
else:
print("Fetching hosts for {}".format(group_id))
fetch_property(hosts)
if __name__ == "__main__":
start = time.time()
fetch_hosts(['a', 'b', 'c', 'd', 'e'])
end = time.time()
print("Total time: {}".format(end-start))