我在一个八节点 Kubernetes 集群上运行 Dask,我的清单指定了一个调度程序副本和八个工作副本。我的代码正在处理 80 个大小差不多的文件,我想看看性能如何从一个工作人员扩展到八个工作人员。我正在做大致这样的事情:
client: distributed.client.Client = get_client()
workers = client.scheduler_info()['workers']
worker_ips: List[str] = list(workers.keys())
my_files: List[str] = ["list", "of", "files", "to", "be", "processed", "..."]
# This dictionary maps a worker ip to a uniform subset of my_files
files_per_worker = {
"worker_ip1" : ["list", "to", "..."], # files for worker1 only
"worker_ip2" : ["of", "be"], # files for worker2 only
"worker_ip3" : ["files", "processed"] # files for worker3 only
}
# Send each worker a subset of the work
futures = [client.submit(do_work, subset_of_files, workers=[ip])
for (ip, subset_of_files) in files_per_worker.items()]
# Get results from each node, blocking until completion, and reducing partial results into final version
result = finalize_partial_results([f.result() for f in futures])
结果的简化摘要是:
- 一个节点最慢(不足为奇)
- 五个节点最快(一个节点大约占 25%)
- 六个节点峰值(比五个节点长约 80%,仅比一个节点的一半时间好)
- 七个或更多节点非常平坦 - 没有太多的增量性能增益。但是,它并没有恢复到五个节点的性能。
我原以为八个 - 每个物理节点一个工人 - 将是最佳的,但事实并非如此。我什至用不同大小的不同输入数据集对此进行了测试;五个节点总是最好的,在六个节点上有很大的跳跃。
什么可能导致这种情况,我怎样才能避免这种性能下降?据我所知,每个都worker_ip
代表一个物理节点,因此工作应该在选定的工作人员子集之间统一共享。