进口:
from dask.distributed import Client
import streamz
import time
模拟工作量:
def increment(x):
time.sleep(0.5)
return x + 1
假设我想在本地 Dask 客户端上处理一些工作负载:
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).gather().sink(print)
for i in range(10):
ps.emit(i)
这按预期工作,但sink(print)
当然会强制等待每个结果,因此流不会并行执行。
但是,如果我使用buffer()
允许缓存结果,则gather()
似乎不再正确收集所有结果,并且解释器在获得结果之前退出。这种方法:
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
# ^
for i in range(10): # - allow parallel execution
ps.emit(i) # - before gather()
...不会为我打印任何结果。Python 解释器只是在启动脚本后不久退出并在 buffer()
发出结果之前退出,因此不会打印任何内容。
但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此它们不会相互等待,而是几乎同时打印):
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
for i in range(10):
ps.emit(i)
time.sleep(10) # <- force main process to wait while ps is working
这是为什么?我认为gather()
应该等待一批 10 个结果,因为buffer()
在将它们刷新到gather()
. 为什么gather()
在这种情况下不阻塞?
有没有一种很好的方法来检查 Stream 是否仍然包含正在处理的元素以防止主进程过早退出?