设置基本上是:
客户端:
with Client(...):
dfs= [client.submit(load_data_lazily, i) for i in ...]
ddf = dd.from_delayed(dfs)
xxx = ... how to serialize the handle to ddf ...?
... or xxx could be a future instead of a dask collection ...
result = client.submit(do_something_with_lazy_remotely, xxx)
工人(工人 - 客户端)端:
def do_something_with_lazy_remotely(xxx):
with worker_client() as client:
ddf = ... how to deserialize from xxx and not materialize? ...
# slice to a reasonable size
df = client.compute(ddf.loc[...], sync=True)
# do some computation on df
client.submit(...)
client.persist
我可以通过使用and得到一些工作client.publish_dataset
。问题是我正在进行手动跟踪以确定何时执行unpublish_dataset
,并且使用异步行为这变得很困难。
现在有(更)自然的方法可以做到这一点吗?
编辑:似乎这可能是去这里的方式?