2

我最近开始使用 ray 进行并行执行。在我的串行用例中,我有一个“超时”,它会停止执行我的循环。我想知道如何使用 ray 做同样的事情。

ray 当前处理异常的方式是执行所有的运行,收集执行过程中发生的错误,并在最后显示消息。

start_time = time.time()

@ray.remote
class Test(object):

    def __init__(self):
        self.res = None

    def run(self, c):
        time.sleep(0.25)
        print(c)
        self.res = c
        if time.time()-start_time > 1:
            print('Raise error at %d !' % c)
            raise TimeoutError('Time out.....!!')

    def get_res(self):
        return self.res


test = [Test.remote() for _ in range(num_cpus)]

for i in range(num_cpus*2):
    test[i % num_cpus].run.remote(i)

res = ray.get([t.get_res.remote() for t in test])

如果当前作业超时,是否可以中断 ray 执行下一个作业?是否有一种全球性的方式来中断处决?

4

1 回答 1

0

现在的工作是什么意思?Ray 是一个分布式环境,一些/所有任务是并行运行的(而不是顺序运行,就像你在常规 python 循环中运行它一样),所以没有“当前作业”的概念。

无论如何,使用您的代码:

res = ray.get([t.get_res.remote() for t in test])

您正在等待最后一项任务完成(因此在所有任务完成/失败之前不会初始化“res”对象)。

您可以做的是稍微修改您的代码,并实现如下内容:

res = [t.get_res.remote() for t in test]
from ray.exceptions import RayTaskError

while len(res): 
    done_id, res= ray.wait(res)
    try:
        task_res = ray.get(done_id[0])  # debug/explore 'done_id' object to understand the results output (if task finished successfully or not)
    except (RayTaskError, Exception):  
        ray.shutdown()
    
于 2021-02-09T15:09:45.210 回答