0

我正在使用 Dask 进行复杂的操作。首先我做了一个减少,产生一个中等大小的df(几MB),然后我需要将它传递给每个工人来计算最终结果,所以我的代码看起来有点像这样

intermediate_result = ddf.reduction().compute()

final_result = ddf.reduction(
    chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)

但是我收到看起来像这样的警告消息

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)

我试过这样做

intermediate_result = client.scatter(intermediate_result, broadcast=True)

但这不起作用,因为函数现在将其视为 Future 对象,而不是它应该是的数据类型。我似乎找不到任何关于如何使用 scatter 减少的文档,有人知道如何做到这一点吗?还是我应该忽略警告消息并像我一样传递中等大小的df?

4

1 回答 1

0

实际上,最好的解决方案可能不是分散您的物化结果,而是首先避免计算它。您可以简单地删除.compute(),这意味着所有计算都在一个阶段完成,结果会自动移动到您需要的地方。

或者,如果你想在阶段之间有一个清晰的界限,你可以使用

intermediate_result = ddf.reduction().persist()

这将启动减少并将其存储在工作人员身上,而不会将其拉给客户。您可以选择在下一步之前等待此完成或不完成。

于 2020-12-16T17:09:23.080 回答