1

我正在用 python 开发一个简单的客户端-服务器应用程序。我正在使用管理器来设置共享队列,但我不知道如何将任意对象从服务器传递到客户端。我怀疑它与 manager.register 函数有关,但在multiprocessing 文档中并没有很好地解释。那里唯一的例子使用队列,没有别的。

这是我的代码:

#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time

class MyObject():
    def __init__( self, p, f ):
        self.parameter = p
        self.processor_function = f

class MyServer():
    def __init__(self, server_info, obj):
        print '=== Launching Server ... ====='
        (ip, port, pw) = server_info
        self.object = obj       #Parameters for task processing

        #Define queues
        self._process_queue = Queue()       #Queue of tasks to be processed
        self._results_queue = Queue()       #Queue of processed tasks to be stored

        #Set up IS_Manager class and register server functions
        class IS_Manager(managers.BaseManager): pass
        IS_Manager.register('get_processQ', callable=self.get_process_queue)
        IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
        IS_Manager.register('get_object', callable=self.get_object)

        #Initialize manager and server
        self.manager = IS_Manager(address=(ip, port), authkey=pw)
        self.server = self.manager.get_server()

        self.server_process = Process( target=self.server.serve_forever )
        self.server_process.start()

    def get_process_queue(self): return self._process_queue
    def get_results_queue(self): return self._results_queue
    def get_object(self): return self.object

    def runUntilDone(self, task_list):
        #Fill the initial queue
        for t in task_list:
            self._process_queue.put(t)

        #Main loop
        total_tasks = len(task_list)
        while not self._results_queue.qsize()==total_tasks:
            time.sleep(.5)
            print self._process_queue.qsize(), '\t', self._results_queue.qsize()
            if not self._results_queue.empty():
                print '\t', self._results_queue.get()
            #Do stuff
            pass

class MyClient():
    def __init__(self, server_info):
        (ip, port, pw) = server_info
        print '=== Launching Client ... ====='

        class IS_Manager(managers.BaseManager): pass

        IS_Manager.register('get_processQ')
        IS_Manager.register('get_resultsQ')
        IS_Manager.register('get_object')

        #Set up manager, pool
        print '\tConnecting to server...'
        manager = IS_Manager(address=(ip, port), authkey=pw)
        manager.connect()

        self._process_queue = manager.get_processQ()
        self._results_queue = manager.get_resultsQ()
        self.object = manager.get_object()

        print '\tConnected.'

    def runUntilDone(self):#, parameters):
        print 'Starting client main loop...'

        #Main loop
        while 1:
            if self._process_queue.empty():
                print 'I\'m bored here!'
                time.sleep(.5)
            else:
                task = self._process_queue.get()
                print task, '\t', self.object.processor_function( task, self.object.parameter )

        print 'Client process is quitting.  Bye!'
        self._clients_queue.get()

还有一个简单的服务器...

from manager_demo import *

def myProcessor( x, parameter ):
    return x + parameter

if __name__ == '__main__':
    my_object = MyObject( 100, myProcessor )
    my_task_list = range(1,20)
    my_server_info = ('127.0.0.1', 8081, 'my_pw')

    my_crawl_server = MyServer( my_server_info, my_object )
    my_crawl_server.runUntilDone( my_task_list )

还有一个简单的客户...

from manager_demo import *
if __name__ == '__main__':
    my_server_info = ('127.0.0.1', 8081, 'my_pw')
    my_client = MyClient( my_server_info )
    my_client.runUntilDone()

当我运行它时,它会崩溃:

erin@Erin:~/Desktop$ python client.py 
=== Launching Client ... =====
    Connecting to server...
    Connected.
Starting client main loop...
2   Traceback (most recent call last):
  File "client.py", line 5, in <module>
    my_client.runUntilDone()
  File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
    print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'

为什么 python 对 Queues 或 processor_function 没有问题,但对 object 参数感到窒息?谢谢!

4

1 回答 1

2

您遇到此问题是因为parameter您的类上的属性MyObject()不是可调用的。

文档指出,用于_exposed_指定代理此 typeid 的方法名称序列。在没有指定公开列表的情况下,共享对象的所有“公共方法”都可以访问。(这里的“公共方法”是指具有 __ call __() 方法且名称不以 '_' 开头的任何属性。)

因此,您将需要手动公开parameter属性MyObject,大概是通过更改您的方法MyObject()

class MyObject():
    def __init__(self, p, f):
        self._parameter = p
        self.processor_function = f

    def parameter(self):
        return self._parameter

此外,您需要将任务更改为:

 self.object.processor_function(task, self.object.parameter())

HTH。

于 2011-03-15T02:50:41.913 回答