我有一些长时间运行的代码(大约 5-10 分钟的处理),我正试图作为 Dask 运行Future
。这是一系列的几个离散步骤,我可以作为一个函数运行:
result : Future = client.submit(my_function, arg1, arg2)
或者我可以分成中间步骤:
# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)
如果我在本地运行它(例如,result = my_function(arg1, arg2)
),它就完成了。如果我将它提交给 Dask,我会立即得到Future
回复 - 正如预期的那样 - 但这项工作永远不会完成。此外,如果我将result.key
用作跟踪作业状态的一种方式,然后将未来重构为result = Future(key)
,它的状态始终为pending
。
我想首先让它按原样运行,以便我可以将我的处理卸载到我的 Dask 工作人员而不是处理请求的 API,然后我希望能够开始跨节点拆分工作,以便我可以改进表现。但为什么我的工作就这么消失了?查看我的 Dask 调度程序 Web 界面,作业似乎都没有出现。但我知道 Dask 正在工作,因为我可以从我的 Jupyter 笔记本向它提交代码。
我client.submit
从 Flask 服务器调用,并返回密钥以便以后使用。大致:
@app.route('/submit')
def submit():
# ...
future = client.submit(my_function, arg1, arg2)
return jsonify({"key": future.key})
@app.route('/status/<key>')
def status(key):
future = Future(key)
return jsonify({"status": future.status})
当我的应用程序部署到 Kubernetes 时,我的/submit
路由会返回一个 Future 密钥,但我的 Dask 状态页面没有显示任何处理任务。如果我在本地运行 Flask,我确实会看到一个任务出现,并且我的作业的输出确实会在预期的延迟后出现;但是,当我使用从 返回的 Future 键到达自己的/status/<key>
路径时/submit
,它始终显示状态为pending。