GRPC 不支持python multiprocessing
。
我想通过从进程进行 GRPC 调用然后关闭通道并删除对 GRPC 对象的所有引用来解决这个问题,这样它们就不会被腌制。
然而,我害怕
Traceback (most recent call last):
File "/home/edgar/miniconda3/envs/ml_env/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/edgar/miniconda3/envs/ml_env/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PickleError: can't pickle repeated message fields, convert to list first
是否可以在使用后删除所有 GRPC 对象以便我可以使用多处理?
如果不是,我对 GRPC 和多处理的问题有什么不明白的地方?
代码看起来像这样。如果证明是必要的,我可以创建一个最小的工作示例。
def run_env(env_id: str, queue: multiprocessing.Queue, predictor_address: str, id: str = '-1'):
# Create environment
env = er.create_env_from_id(env_id=env_id)
# Run Forest, Run
state = env.reset()
done = False
while not done:
done = True
action = get_prediction(state=state, predictor_address=predictor_address)
new_state, reward, done, info = env.step(action)
tup = (id, state, action, reward, new_state, done)
queue.put(tup)
if done:
state = env.reset()
done = False
def get_prediction(state: np.ndarray, predictor_address: str) -> np.ndarray:
with grpc.insecure_channel(predictor_address, options=options) as channel:
stub = pgrpc.MlAgentPredictorStub(channel)
action_pb = stub.Predict(get_state(nstate=state))
action = action_pb.action
del action_pb
return action
def env_test():
predictor_address = 'localhost:50051'
runners = []
pqueue = multiprocessing.Queue()
for ii in range(2):
args = ('Test', pqueue, predictor_address, str(ii), )
runner = multiprocessing.Process(target=run_env, args=args)
runner.daemon = True
runner.start()
runners.append(runner)