1

我正在运行一个应用程序,它不能坐下来等待与 Python 管理器的成功/不成功连接。客户端应用程序应该尝试将一些信息发送到假定正在运行的服务器,如果它失败,则采取另一种措施。问题是每当服务器关闭时,连接需要很长时间才能将控制权返回给客户端应用程序,并且它不能浪费时间等待它,因为还有其他事情要做。

我想出了一个方案,其中一个中间对象负责连接,但它只工作一次。假设第一次,当仍然没有连接到服务器时,这个中间对象在不阻塞客户端应用程序的情况下处理连接部分。如果由于某种原因,服务器宕机又回来了,我就不能让它工作了。

假设我有以下服务器:

# server.py

from multiprocessing import Queue, managers
from multiprocessing.queues import Empty
import select
import threading


class RServer(object):
    def __init__(self, items_buffer):

        self.items_buffer = items_buffer

    def receive_items(self):
        while True:
            (_, [], []) = select.select([self.items_buffer._reader], [], [])
            while True:
                try:
                    item = self.items_buffer.get(block=False)
                    # do something with item
                    print('item received')
                except Empty:
                    break


class SharedObjectsManager(managers.BaseManager):
    pass


if __name__ == '__main__':

    items_buffer = Queue()

    remote_server = RServer(items_buffer)

    remote_server_th = threading.Thread(target=remote_server.receive_items)
    remote_server_th.start()

    SharedObjectsManager.register('items_buffer', callable=lambda: items_buffer)

    shared_objects_manager = SharedObjectsManager(address=('localhost', 5001),
                                                  authkey=str.encode('my_server'),
                                                  serializer='xmlrpclib')

    s = shared_objects_manager.get_server()
    s.serve_forever()

这是处理连接的中间对象:

# bridge.py

from multiprocessing.managers import BaseManager
import threading
import socket


class ConnectionManager():
    def __init__(self):
        self.remote_manager = BaseManager(address=('localhost', 5001),
                                          authkey=b'my_server',
                                          serializer='xmlrpclib')
        self.remote_manager.register('items_buffer')
        self.items_buffer = None
        self.items_buffer_lock = threading.Lock()
        self.connecting = False
        self.connecting_lock = threading.Lock()
        self.connection_started_condition = threading.Condition()

    def transmit_item(self, item):
        try:
            with self.items_buffer_lock:
                self.items_buffer.put(item)
        except (AttributeError, EOFError, IOError):
            with self.connection_started_condition:
                with self.connecting_lock:
                    if not self.connecting:
                        self.connecting = True

                        connect_th = threading.Thread(target=self.connect_to_server,
                                                      name='Client Connect')
                        connect_th.start()

                self.connection_started_condition.notify()

            raise ConnectionError('Connection Error')

    def connect_to_server(self):
        with self.connection_started_condition:
            self.connection_started_condition.wait()
        try:
            self.remote_manager.connect()
        except socket.error:
            pass
        else:
            try:
                with self.items_buffer_lock:
                    self.items_buffer = self.remote_manager.items_buffer()
            except (AssertionError, socket.error):
                pass

        with self.connecting_lock:
            self.connecting = False


class ConnectionError(Exception):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        return repr(self.value)

最后是客户端应用程序:

# client.py

import time

from bridge import ConnectionManager, ConnectionError


remote_buffer = ConnectionManager()

while True:
    try:
        remote_buffer.transmit_item({'rubish': None})
        print('item sent')
    except ConnectionError:
        # do something else
        print('item not sent')

    # do other stuff
    print('doing other stuff')
    time.sleep(15)

我肯定在线程上做错了什么,但我无法弄清楚。任何想法?

4

0 回答 0