我有一个简单的任务,它由 dask-scheduler 调度并在工作节点上运行。
我的要求是,我希望能够在用户想要的时候按需停止任务。
我有一个简单的任务,它由 dask-scheduler 调度并在工作节点上运行。
我的要求是,我希望能够在用户想要的时候按需停止任务。
您必须将其构建到您的任务中,也许通过在循环中显式检查分布式变量对象。
from dask.distributed import Variable
stop = Variable()
stop.set(False)
def my_task():
while True:
if stop.get():
return
else:
# do stuff
future = client.submit(my_task)
# wait
stop.set(True)
您将需要像这样明确的东西。任务通常在单独的线程中运行。据我所知,没有办法中断线程(尽管我很乐意学习其他方法)。
@MRocklin。感谢您的建议.. 这是我围绕显式停止运行/实时任务构建的机制。虽然下面的代码没有被重构..请追踪它背后的逻辑..谢谢 - Manoranjan(我会标记你的回答真的很有帮助..):) 继续做好..
import os
import subprocess
from dask.distributed import Variable, Client
from multiprocessing import Process, current_process
import time
global stop
def my_task(proc):
print("my_task..")
print("child proc::", proc)
p = None
childProcessCreated = False
while True:
print("stop.get()::", stop.get())
if stop.get():
print("event triggered for stopping the live task..")
p.terminate()
return 100
else:
if childProcessCreated == False:
print("childProcessCreated::", childProcessCreated)
p = subprocess.Popen("python sleep.py", shell=False)
childProcessCreated = True
print("subprocess p::", p, " type::", type(p))
time.sleep(1)
print("returnning with 20")
return 20
if __name__ == '__main__':
clienta = Client("192.168.1.2:8786")
print("global declaration..")
global stop
stop = Variable("name-xx", client = clienta)
stop.set(False)
future = clienta.submit(my_task, 10)
print("future::waiting for 4 sec..in client side", future)
time.sleep(3)
print("future after sleeping for sec", future)
#print("result::", future.result())
stop.set(True)
print("future after stopping the child process::", future)
print("child process should be stopped by now..")
#print("future::", future)
#print("future result::",future.result())
print("over.!")