2

我正在编写一个 python 脚本来获取与特定 group_id 对应的主机列表。我将使用 Web 服务调用来获取相同的内容。主机数量可以在 10,000 台以内。现在,对于每个主机,我将从另一个 Web 服务获取一个名为 property 的值。
所以 group-id ----(ws1)-----10000s 个主机 --(ws2)----每个的属性

我正在使用 concurrent.futures,如下面的代码所示。但它似乎不是一个干净的设计,也不太可能很好地扩展。

def call_ws_1(group_id):
     #fetch list of hosts for group_id


def call_ws_2(host):
     #find property for host


def fetch_hosts(group_ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
        for future in concurrent.futures.as_completed(future_to_grp_id):
            group_id = future_to_grp_id[future]
            try:
                hosts = future.result()#this is a list
            except Exception as exp:
                #logging etc
            else:
                 fetch_property(hosts)


def fetch_property(hosts):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_host = {executor.submit(call_ws_2, host): host for host in hosts}
        for future in concurrent.futures.as_completed(future_to_host):
            host = future_to_host[future]
            try:
                host_prop = future.result()#String
            except Exception as exp:
                #logging etc
            else:
                 #Save host and property to DB
  1. 使用 ProcessPoolExecuter 会有什么优势吗?
  2. 先获取所有主机(大约 40000 个)然后调用 ws 获取属性怎么样
  3. 还有其他改进此设计的建议吗?
4

1 回答 1

2
  1. ProcessPoolExecutor优点是不受 GIL 影响。使用ThreadPoolExecutor,GIL 将阻止一次运行多个线程,除非您正在执行 I/O。好消息是,看起来您的两个线程都将主要进行 I/O,但是在调用 Web 服务之前或之后在每个线程中发生的任何类型的处理都不会真正同时发生,这会损害您的性能。ProcessPoolExecutor不会有这个限制,但它增加了在进程之间发送group_idhost数据的开销。如果您有数以万计的主机,那么在进程之间一次发送这些主机将产生相当大的开销。

  2. 我认为仅此更改不会对性能产生太大影响,因为最终您仍然一次将每个主机发送到一个线程进行处理。

至于数字 3,如果您的工作线程实际上除了 I/O 几乎什么都不做,那么这种方法可能工作得很好。但是对于线程,worker 中的任何 CPU 密集型工作都会扼杀你的性能。我采用了您的确切程序布局并像这样实现了您的两个工人:

def call_ws_1(group_id):
    return list(range(20))

def call_ws_2(host):
    sum(range(33000000))  # CPU-bound
    #time.sleep(1)  # I/O-bound
    return "{} property".format(host)

并像这样执行一切:

if __name__ == "__main__":
    start = time.time()
    fetch_hosts(['a', 'b', 'c', 'd', 'e'])
    end = time.time()
    print("Total time: {}".format(end-start))

使用time.sleep,输出为:

Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 25.051292896270752

使用sum(range(33000000))计算,性能要差得多:

Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 75.81612730026245

请注意,在我的笔记本电脑上计算大约需要一秒钟:

>>> timeit.timeit("sum(range(33000000))", number=1)
1.023313045501709
>>> timeit.timeit("sum(range(33000000))", number=1)
1.029937982559204

所以每个工人大约需要一秒钟。但是因为一个受 CPU 限制,因此受 GIL 的影响,线程的性能非常糟糕。

这是一个ProcessPoolExecutor使用time.sleep

Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 25.169482469558716

现在使用sum(range(33000000))

Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 43.54587936401367

如您所见,虽然性能仍然比线程版本差time.sleep(可能是因为计算时间超过一秒,并且 CPU 密集型工作必须与笔记本电脑上运行的所有其他东西竞争),但它仍然大大优于线程版本。

但是,我怀疑随着主机数量的增加,IPC 的成本会让你慢很多。以下是对ThreadPoolExecutor10000 个主机的处理方式,但工作进程什么都不做(它只是返回):

Fetching hosts for c
Fetching hosts for b
Fetching hosts for d
Fetching hosts for a
Fetching hosts for e
Total time: 9.535644769668579

比较ProcessPoolExecutor

Fetching hosts for c
Fetching hosts for b
Fetching hosts for a
Fetching hosts for d
Fetching hosts for e
Total time: 36.59257411956787

所以它的速度要慢 4 倍ProcessPoolExecutor,这都是由 IPC 的成本引起的。

那么,这一切意味着什么?我认为您的最佳性能将来自使用ProcessPoolExecutor,但另外批处理您的 IPC,以便您将大块主机发送到子进程中,而不是一次只发送一个主机。

像这样的东西(未经测试,但给你的想法):

import time
import itertools
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor as Pool

def call_ws_1(group_id):
    return list(range(10000))

def call_ws_2(hosts):  # This worker now works on a list of hosts
    host_results = []
    for host in hosts:
        host_results.append(( host, "{} property".format(host)))  # returns a list of (host, property) tuples
    return host_results

def chunk_list(l):
    chunksize = len(l) // 16  # Break the list into smaller pieces
    it = [iter(l)] * chunksize
    for item in itertools.zip_longest(*it):
        yield tuple(filter(None, item))

def fetch_property(hosts):
    with Pool(max_workers=4) as executor:
        futs = []
        for chunk in chunk_list(hosts):
            futs.append(concurrent.futures.submit(call_ws_2, chunk))
        for future in concurrent.futures.as_completed(futs):
            try:
                 results = future.result()
            except Exception as exp:
                print("Got %s" % exp)
            else:
                for result in results:
                    host, property = result
                    # Save host and property to DB

def fetch_hosts(group_ids):
    with Pool(max_workers=4) as executor:
        future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
        for future in concurrent.futures.as_completed(future_to_grp_id):
            group_id = future_to_grp_id[future]
            try:
                hosts = future.result()#this is a list
            except Exception as exp:
                print("Got %s" % exp)
            else:
                print("Fetching hosts for {}".format(group_id))
                fetch_property(hosts)

if __name__ == "__main__":
    start = time.time()
    fetch_hosts(['a', 'b', 'c', 'd', 'e'])
    end = time.time()
    print("Total time: {}".format(end-start))
于 2014-07-24T01:02:38.660 回答