0

我有三个文件如下

""" main.py """
import time
import ray

class LocalBuffer(dict):
    def __call__(self):
        return self

@ray.remote
class Worker():
    def __init__(self, learner):
        self.local = LocalBuffer()
        self.learner = learner

    def sample(self):
        for i in range(10):
            self.local.update({
                'state': [1, 2, 3]
            })
            print(self.local)
            self.learner.update_buffer.remote(self.local)

@ray.remote
class Learner():
    def __init__(self):
        self.buffer = {}

    def update_buffer(self, local_buffer):
        print(local_buffer)
        self.buffer['state'] = local_buffer['state']

ray.init()

learner = Learner.remote()
worker = Worker.remote(learner)

worker.sample.remote()

time.sleep(10)

如果我删除所有与ray. 如果没有,就会发生错误。错误消息说stateinlocal_buffer中没有update_buffer。我知道错误是由于在中LocalBuffer定义的worker.py——如果我定义Worker.local为内置的dict,一切都会好起来的。但是为什么我不能用LocalBuffer?我在这里确实需要它,但我不知道如何使它工作。

更新

我知道问题出在哪里。原因是workerlearner处于不同的进程中。而用户定义的对象如self.local不能在进程之间传递。对于这个特定问题,我可以通过强制转换self.localdictwhenself.local被传递给self.learner.update_buffer. 我尝试导入LocalBufferin learner.py,但没有成功。也许我必须更多地了解多处理才能弄清楚。如果有人愿意填写一些有用的信息,我将不胜感激。

4

1 回答 1

0

我们必须制作LocalBuffer一个射线演员才能使其工作。以下代码按需要工作。

import ray

@ray.remote
class LocalBuffer(dict):
    # have to redefine these functions in order to make it work with ray
    def __getitem__(self, k):
        return super().__getitem__(k)

    def update(self, d):
        super().update(d)

    def __call__(self):
        # cannot return self since self is a ray actor
        return dict(super().items())


@ray.remote
class Worker():
    def __init__(self, learner):
        self.local = LocalBuffer.remote()
        self.learner = learner

    def sample(self):
        for i in range(10):
            id = self.local.update.remote({
                'state': [1, 2, 3]
            })
            print(ray.get(self.local.__call__.remote()))
            self.learner.update_buffer.remote(self.local)


@ray.remote
class Learner():
    def __init__(self):
        self.buffer = {}

    def update_buffer(self, local_buffer):
        print(ray.get(local_buffer.__call__.remote()))
        self.buffer['state'] = ray.get(local_buffer.__getitem__.remote('state'))
        print('learner buffer', self.buffer)

ray.init()

learner = Learner.remote()
worker = Worker.remote(learner)

ray.get(worker.sample.remote())
于 2019-06-03T23:26:35.640 回答