我正在用 python 开发一个简单的客户端-服务器应用程序。我正在使用管理器来设置共享队列,但我不知道如何将任意对象从服务器传递到客户端。我怀疑它与 manager.register 函数有关,但在multiprocessing 文档中并没有很好地解释。那里唯一的例子使用队列,没有别的。
这是我的代码:
#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time
class MyObject():
def __init__( self, p, f ):
self.parameter = p
self.processor_function = f
class MyServer():
def __init__(self, server_info, obj):
print '=== Launching Server ... ====='
(ip, port, pw) = server_info
self.object = obj #Parameters for task processing
#Define queues
self._process_queue = Queue() #Queue of tasks to be processed
self._results_queue = Queue() #Queue of processed tasks to be stored
#Set up IS_Manager class and register server functions
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ', callable=self.get_process_queue)
IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
IS_Manager.register('get_object', callable=self.get_object)
#Initialize manager and server
self.manager = IS_Manager(address=(ip, port), authkey=pw)
self.server = self.manager.get_server()
self.server_process = Process( target=self.server.serve_forever )
self.server_process.start()
def get_process_queue(self): return self._process_queue
def get_results_queue(self): return self._results_queue
def get_object(self): return self.object
def runUntilDone(self, task_list):
#Fill the initial queue
for t in task_list:
self._process_queue.put(t)
#Main loop
total_tasks = len(task_list)
while not self._results_queue.qsize()==total_tasks:
time.sleep(.5)
print self._process_queue.qsize(), '\t', self._results_queue.qsize()
if not self._results_queue.empty():
print '\t', self._results_queue.get()
#Do stuff
pass
class MyClient():
def __init__(self, server_info):
(ip, port, pw) = server_info
print '=== Launching Client ... ====='
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ')
IS_Manager.register('get_resultsQ')
IS_Manager.register('get_object')
#Set up manager, pool
print '\tConnecting to server...'
manager = IS_Manager(address=(ip, port), authkey=pw)
manager.connect()
self._process_queue = manager.get_processQ()
self._results_queue = manager.get_resultsQ()
self.object = manager.get_object()
print '\tConnected.'
def runUntilDone(self):#, parameters):
print 'Starting client main loop...'
#Main loop
while 1:
if self._process_queue.empty():
print 'I\'m bored here!'
time.sleep(.5)
else:
task = self._process_queue.get()
print task, '\t', self.object.processor_function( task, self.object.parameter )
print 'Client process is quitting. Bye!'
self._clients_queue.get()
还有一个简单的服务器...
from manager_demo import *
def myProcessor( x, parameter ):
return x + parameter
if __name__ == '__main__':
my_object = MyObject( 100, myProcessor )
my_task_list = range(1,20)
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_crawl_server = MyServer( my_server_info, my_object )
my_crawl_server.runUntilDone( my_task_list )
还有一个简单的客户...
from manager_demo import *
if __name__ == '__main__':
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_client = MyClient( my_server_info )
my_client.runUntilDone()
当我运行它时,它会崩溃:
erin@Erin:~/Desktop$ python client.py
=== Launching Client ... =====
Connecting to server...
Connected.
Starting client main loop...
2 Traceback (most recent call last):
File "client.py", line 5, in <module>
my_client.runUntilDone()
File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'
为什么 python 对 Queues 或 processor_function 没有问题,但对 object 参数感到窒息?谢谢!