我正在尝试测试一些在断开连接后重新连接到服务器的代码。这在测试之外工作得很好,但是在运行测试时它无法确认套接字已断开连接。
我正在使用 Gevent Stream Server 来模拟一个真正的监听服务器:
import gevent.server
from gevent import queue
class TestServer(gevent.server.StreamServer):
def __init__(self, *args, **kwargs):
super(TestServer, self).__init__(*args, **kwargs)
self.sockets = {}
def handle(self, socket, address):
self.sockets[address] = (socket, queue.Queue())
socket.sendall('testing the connection\r\n')
gevent.spawn(self.recv, address)
def recv(self, address):
socket = self.sockets[address][0]
queue = self.sockets[address][1]
print 'Connection accepted %s:%d' % address
try:
for data in socket.recv(1024):
queue.put(data)
except:
pass
def murder(self):
self.stop()
for sock in self.sockets.iteritems():
print sock
sock[1][0].shutdown(socket.SHUT_RDWR)
sock[1][0].close()
self.sockets = {}
def run_server():
test_server = TestServer(('127.0.0.1', 10666))
test_server.start()
return test_server
我的测试看起来像这样:
def test_can_reconnect(self):
test_server = run_server()
client_config = {'host': '127.0.0.1', 'port': 10666}
client = Connection('test client', client_config, get_config())
client.connect()
assert client.socket_connected
test_server.murder()
#time.sleep(4) #tried sleeping. no dice.
assert not client.socket_connected
assert client.server_disconnect
test_server = run_server()
client.reconnect()
assert client.socket_connected
它在assert not client.socket_connected
.
我在接收期间检测到“非数据”。如果它是None,那么我设置一些变量,以便其他代码可以决定是否重新连接(如果是user_disconnect,则不要重新连接等等)。这种行为有效并且过去一直对我有效,直到现在我才尝试对其进行测试。套接字连接和本地函数范围或其他东西有什么奇怪的吗?就像停止服务器后连接仍然存在一样。
我要测试的代码是开放的:https ://github.com/kyleterry/tenyks.git
如果你运行测试,你会看到我试图修复的那个失败。