1

我有一些长时间运行的代码(大约 5-10 分钟的处理),我正试图作为 Dask 运行Future。这是一系列的几个离散步骤,我可以作为一个函数运行:

result : Future = client.submit(my_function, arg1, arg2)

或者我可以分成中间步骤:

# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)

如果我在本地运行它(例如,result = my_function(arg1, arg2)),它就完成了。如果我将它提交给 Dask,我会立即得到Future回复 - 正如预期的那样 - 但这项工作永远不会完成。此外,如果我将result.key用作跟踪作业状态的一种方式,然后将未来重构为result = Future(key),它的状态始终pending

我想首先让它按原样运行,以便我可以将我的处理卸载到我的 Dask 工作人员而不是处理请求的 API,然后我希望能够开始跨节点拆分工作,以便我可以改进表现。但为什么我的工作就这么消失了?查看我的 Dask 调度程序 Web 界面,作业似乎都没有出现。但我知道 Dask 正在工作,因为我可以从我的 Jupyter 笔记本向它提交代码。

client.submit从 Flask 服务器调用,并返回密钥以便以后使用。大致:

@app.route('/submit')
def submit():
    # ...
    future = client.submit(my_function, arg1, arg2)
    return jsonify({"key": future.key})

@app.route('/status/<key>')
def status(key):
    future = Future(key)
    return jsonify({"status": future.status})

当我的应用程序部署到 Kubernetes 时,我的/submit路由会返回一个 Future 密钥,但我的 Dask 状态页面没有显示任何处理任务。如果我在本地运行 Flask,我确实会看到一个任务出现,并且我的作业的输出确实会在预期的延迟后出现;但是,当我使用从 返回的 Future 键到达自己的/status/<key>路径时/submit,它始终显示状态为pending

4

1 回答 1

0

如果指向某个任务的所有未来都消失了,那么 Dask 可以随意忘记该任务。这允许 Dask 清理工作,而不是让所有中间结果永远存在。

如果您想保留参考,那么您需要保留期货。这告诉 Dask 你仍然关心结果。您可以通过创建字典在您的烧瓶应用程序中本地执行此操作。

futures = {}

@app.route('/submit')
def submit():
    # ...
    future = client.submit(my_function, arg1, arg2)
    futures[future.key] = future
    return jsonify({"key": future.key})

@app.route('/status/<key>')
def status(key):
    future = futures[key]
    return jsonify({"status": future.status})

但是您还需要考虑何时可以清理和释放这些期货。通过这种方法,您将慢慢填满您的记忆。

于 2020-06-13T15:30:05.710 回答