3

GRPC 不支持python multiprocessing

我想通过从进程进行 GRPC 调用然后关闭通道并删除对 GRPC 对象的所有引用来解决这个问题,这样它们就不会被腌制。

然而,我害怕

Traceback (most recent call last):
  File "/home/edgar/miniconda3/envs/ml_env/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/edgar/miniconda3/envs/ml_env/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PickleError: can't pickle repeated message fields, convert to list first

是否可以在使用后删除所有 GRPC 对象以便我可以使用多处理?

如果不是,我对 GRPC 和多处理的问题有什么不明白的地方?

代码看起来像这样。如果证明是必要的,我可以创建一个最小的工作示例。

def run_env(env_id: str, queue: multiprocessing.Queue, predictor_address: str, id: str = '-1'):
    # Create environment
    env = er.create_env_from_id(env_id=env_id)

    # Run Forest, Run
    state = env.reset()
    done = False
    while not done:
        done = True
        action = get_prediction(state=state, predictor_address=predictor_address)
        new_state, reward, done, info = env.step(action)
        tup = (id, state, action, reward, new_state, done)
        queue.put(tup)

        if done:
            state = env.reset()
            done = False


def get_prediction(state: np.ndarray, predictor_address: str) -> np.ndarray:
    with grpc.insecure_channel(predictor_address, options=options) as channel:
        stub = pgrpc.MlAgentPredictorStub(channel)
        action_pb = stub.Predict(get_state(nstate=state))

    action = action_pb.action
    del action_pb
    return action

def env_test():
    predictor_address = 'localhost:50051'
    runners = []
    pqueue = multiprocessing.Queue()
    for ii in range(2):
        args = ('Test', pqueue, predictor_address, str(ii), )
        runner = multiprocessing.Process(target=run_env, args=args)
        runner.daemon = True
        runner.start()
        runners.append(runner)
4

0 回答 0