我正在运行一个应用程序,它不能坐下来等待与 Python 管理器的成功/不成功连接。客户端应用程序应该尝试将一些信息发送到假定正在运行的服务器,如果它失败,则采取另一种措施。问题是每当服务器关闭时,连接需要很长时间才能将控制权返回给客户端应用程序,并且它不能浪费时间等待它,因为还有其他事情要做。
我想出了一个方案,其中一个中间对象负责连接,但它只工作一次。假设第一次,当仍然没有连接到服务器时,这个中间对象在不阻塞客户端应用程序的情况下处理连接部分。如果由于某种原因,服务器宕机又回来了,我就不能让它工作了。
假设我有以下服务器:
# server.py
from multiprocessing import Queue, managers
from multiprocessing.queues import Empty
import select
import threading
class RServer(object):
def __init__(self, items_buffer):
self.items_buffer = items_buffer
def receive_items(self):
while True:
(_, [], []) = select.select([self.items_buffer._reader], [], [])
while True:
try:
item = self.items_buffer.get(block=False)
# do something with item
print('item received')
except Empty:
break
class SharedObjectsManager(managers.BaseManager):
pass
if __name__ == '__main__':
items_buffer = Queue()
remote_server = RServer(items_buffer)
remote_server_th = threading.Thread(target=remote_server.receive_items)
remote_server_th.start()
SharedObjectsManager.register('items_buffer', callable=lambda: items_buffer)
shared_objects_manager = SharedObjectsManager(address=('localhost', 5001),
authkey=str.encode('my_server'),
serializer='xmlrpclib')
s = shared_objects_manager.get_server()
s.serve_forever()
这是处理连接的中间对象:
# bridge.py
from multiprocessing.managers import BaseManager
import threading
import socket
class ConnectionManager():
def __init__(self):
self.remote_manager = BaseManager(address=('localhost', 5001),
authkey=b'my_server',
serializer='xmlrpclib')
self.remote_manager.register('items_buffer')
self.items_buffer = None
self.items_buffer_lock = threading.Lock()
self.connecting = False
self.connecting_lock = threading.Lock()
self.connection_started_condition = threading.Condition()
def transmit_item(self, item):
try:
with self.items_buffer_lock:
self.items_buffer.put(item)
except (AttributeError, EOFError, IOError):
with self.connection_started_condition:
with self.connecting_lock:
if not self.connecting:
self.connecting = True
connect_th = threading.Thread(target=self.connect_to_server,
name='Client Connect')
connect_th.start()
self.connection_started_condition.notify()
raise ConnectionError('Connection Error')
def connect_to_server(self):
with self.connection_started_condition:
self.connection_started_condition.wait()
try:
self.remote_manager.connect()
except socket.error:
pass
else:
try:
with self.items_buffer_lock:
self.items_buffer = self.remote_manager.items_buffer()
except (AssertionError, socket.error):
pass
with self.connecting_lock:
self.connecting = False
class ConnectionError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
最后是客户端应用程序:
# client.py
import time
from bridge import ConnectionManager, ConnectionError
remote_buffer = ConnectionManager()
while True:
try:
remote_buffer.transmit_item({'rubish': None})
print('item sent')
except ConnectionError:
# do something else
print('item not sent')
# do other stuff
print('doing other stuff')
time.sleep(15)
我肯定在线程上做错了什么,但我无法弄清楚。任何想法?