我有长时间运行的文件 I/O 任务,我希望能够将其移动到守护程序/服务器进程中。CLI 工具将用于对新作业进行排队以运行,查询正在运行的作业的状态,并等待单个作业。Pythonmultiprocessing.managers
看起来像是处理 IPC 的一种非常简单的方法。我希望能够SyncManager.Event
为客户端构建一个等待而不阻塞服务器,但尝试这样做会导致触发“服务器尚未启动”断言。具有讽刺意味的是,这个断言是从服务器发送到客户端的,所以很明显服务器是在某个地方启动的。
这是最小的示例:
#!/usr/bin/env python3
import time
import sys
import concurrent.futures
from multiprocessing.managers import SyncManager
def do_work(files):
"""Simulate doing some work on a set of files."""
print(f"Starting work for {files}.")
time.sleep(2)
print(f"Finished work for {files}.")
# Thread pool to do work in.
pool = concurrent.futures.ProcessPoolExecutor(max_workers=1)
class Job:
job_counter = 1
def __init__(self, files):
"""Setup a job and queue work for files on our thread pool."""
self._job_number = self.job_counter
Job.job_counter += 1
print(f"manager._state.value = {manager._state.value}")
self._finished_event = manager.Event()
print(f"Queued job {self.number()}.")
future = pool.submit(do_work, files)
future.add_done_callback(lambda f : self._finished_event.set())
def number(self):
return self._job_number
def event(self):
"""Get an event which can be waited on for the job to complete."""
return self._finished_event
class MyManager(SyncManager):
pass
MyManager.register("Job", Job)
manager = MyManager(address=("localhost", 16000), authkey=b"qca-authkey")
if len(sys.argv) > 1 and sys.argv[1] == "server":
manager.start()
print(f"Manager listening at {manager.address}.")
while True:
time.sleep(1)
else:
manager.connect()
print(f"Connected to {manager.address}.")
job = manager.Job(["a", "b", "c"])
job.event().wait()
print("Done")
如果我运行客户端,我会看到:
$ ./mp-manager.py
Connected to ('localhost', 16000).
Traceback (most recent call last):
File "./mp-manager.py", line 54, in <module>
job = manager.Job(["a", "b", "c"])
File "/usr/lib/python3.8/multiprocessing/managers.py", line 740, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/lib/python3.8/multiprocessing/managers.py", line 625, in _create
id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
File "/usr/lib/python3.8/multiprocessing/managers.py", line 91, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 210, in handle_request
result = func(c, *args, **kwds)
File "/usr/lib/python3.8/multiprocessing/managers.py", line 403, in create
obj = callable(*args, **kwds)
File "./mp-manager.py", line 24, in __init__
self._finished_event = manager.Event()
File "/usr/lib/python3.8/multiprocessing/managers.py", line 740, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/lib/python3.8/multiprocessing/managers.py", line 622, in _create
assert self._state.value == State.STARTED, 'server not yet started'
AssertionError: server not yet started
---------------------------------------------------------------------------
服务器输出为:
$ ./mp-manager.py server
Manager listening at ('127.0.0.1', 16000).
manager._state.value = 0