我们公司目前正在将 prefect.io用于数据工作流程(ELT、报告生成、ML 等)。我们刚刚开始添加由Dask提供支持的并行任务执行功能。我们的流程使用临时AWS Fargate容器执行,该容器将使用 Dask LocalCluster,并将一定数量的工作人员、线程、进程传递到 LocalCluster 对象中。
我们在 Dask 上的旅程看起来很像这样:
- 继续使用单机 LocalCluster 直到我们超出允许的最大 cpu/内存
- 当我们扩展单个容器时,在初始容器上生成额外的工作容器(la dask-kubernetes)并将它们加入 LocalCluster。
我们目前从具有 256 个 cpu(.25 vCPU) 和 512 个内存的容器开始,并将 LocalCluster 固定到 1 个 n_workers 和 3 个 thread_per_worker 以获得合理数量的并行度。但是,这确实是猜测工作。1 n_workers,因为它是一台少于 1 个 vcpu 和 3 个线程的机器,因为根据我之前在 Fargate 中运行其他基于 python 的应用程序的经验,这对我来说听起来并不疯狂。在一个非常简单的示例中,这似乎可以正常工作,该示例仅将函数映射到项目列表。
RENEWAL_TABLES = [
'Activity',
'CurrentPolicyTermStaus',
'PolicyRenewalStatus',
'PolicyTerm',
'PolicyTermStatus',
'EndorsementPolicyTerm',
'PolicyLifeState'
]
RENEWAL_TABLES_PAIRS = [
(i, 1433 + idx) for idx, i in enumerate(RENEWAL_TABLES)
]
@task(state_handlers=[HANDLER])
def dummy_step():
LOGGER.info('Dummy Step...')
sleep(15)
@task(state_handlers=[HANDLER])
def test_map(table):
LOGGER.info('table: {}...'.format(table))
sleep(15)
with Flow(Path(__file__).stem, SCHEDULE, state_handlers=[HANDLER]) as flow:
first_step = dummy_step()
test_map.map(RENEWAL_TABLES_PAIRS).set_dependencies(upstream_tasks=[first_step])
我看到一次执行的任务不超过 3 个。
我真的很想了解如何最好地配置 n_workers(单机)、线程、进程,因为我们将单机的大小扩展到添加远程工作人员。我知道这取决于我的工作量,但是您可以在单个流程中看到多个事物的组合,其中一个任务从数据库提取到 csv,而另一个任务运行 pandas 计算。我在网上看到的东西似乎应该是线程=文档请求的 cpu 数量,但似乎您仍然可以在 Fargate 中使用少于一个 cpu 实现并行性。
任何反馈都将不胜感激,并可以帮助其他希望以更短暂的方式利用 Dask 的人。
鉴于对于 vCPU,Fargate 从 .25 -> .50 -> 1 -> 2 -> 4 递增,我认为将 1 个工作人员设置为 1 个 vcpu 设置是安全的。但是,考虑到 Fargate vcpu 分配的工作原理,了解如何为每个工作人员的线程数选择一个好的上限会很有帮助。