2

我有一个简单的任务,它由 dask-scheduler 调度并在工作节点上运行。

我的要求是,我希望能够在用户想要的时候按需停止任务。

4

2 回答 2

1

您必须将其构建到您的任务中,也许通过在循环中显式检查分布式变量对象。

from dask.distributed import Variable

stop = Variable()
stop.set(False)

def my_task():
    while True:
        if stop.get():
            return

        else:
            # do stuff

future = client.submit(my_task)
# wait
stop.set(True)

您将需要像这样明确的东西。任务通常在单独的线程中运行。据我所知,没有办法中断线程(尽管我很乐意学习其他方法)。

于 2018-01-05T13:41:29.973 回答
0

@MRocklin。感谢您的建议.. 这是我围绕显式停止运行/实时任务构建的机制。虽然下面的代码没有被重构..请追踪它背后的逻辑..谢谢 - Manoranjan(我会标记你的回答真的很有帮助..):) 继续做好..

import os
import subprocess
from dask.distributed import Variable, Client 
from multiprocessing import Process, current_process
import time

global stop

def my_task(proc):
    print("my_task..")
    print("child proc::", proc)
    p = None
    childProcessCreated = False
    while True:
      print("stop.get()::", stop.get())
      if stop.get():
        print("event triggered for stopping the live task..")
        p.terminate()
        return 100
      else:
        if childProcessCreated == False:
          print("childProcessCreated::", childProcessCreated)
          p = subprocess.Popen("python sleep.py", shell=False)
          childProcessCreated = True
          print("subprocess p::", p, " type::", type(p))
      time.sleep(1)
    print("returnning with 20")
    return 20

if __name__ == '__main__':

    clienta = Client("192.168.1.2:8786")
    print("global declaration..")
    global stop
    stop = Variable("name-xx", client = clienta)
    stop.set(False)
    future = clienta.submit(my_task, 10)
    print("future::waiting for 4 sec..in client side", future)
    time.sleep(3)
    print("future after sleeping for sec", future)
    #print("result::", future.result())
    stop.set(True)
    print("future after stopping the child process::", future)
    print("child process should be stopped by now..")
    #print("future::", future)
    #print("future result::",future.result())

print("over.!")
于 2018-01-09T18:14:44.790 回答