我有以下设置:
在 Linux x64 下使用 Python 3.4 运行的是带有 ws4py-0.4.2 广播处理程序的 CherryPy-10.2.1 网络服务器,它将打印接收到的消息(包含时间戳)和该消息的运行时间。
然后有 60 个客户端线程(我也使用 10、20、50 和 70 个线程,结果相似)将使用 ws4py 连接到服务器。他们将每 0.1 秒发送一次时间戳消息。
在服务器日志文件中大约 48500 行之后,消息只会在 10 秒的间隔后到达。但是客户端线程继续以原始速度发送。似乎消息已发送,被缓冲并且仅在 10 秒后才释放。
如果我终止客户端线程,消息不会丢失,但所有被阻止的消息都会被释放并出现在广播处理程序中(使用它们各自的运行时,取决于你让线程运行多长时间。如果你让客户端线程发送那么长的时间。)
如果我关闭sock
客户端并在 850 次发送后再次连接(这将是在 51000 条消息之后),则消息首先会被阻止,但由于重新连接,这些消息很快就会“刷新”并且消息出现在服务器日志中。
这些消息在哪里被阻止?ws4pysend()
函数正在使用socket.sendall()
,这个发送有问题吗?或者消息是否在服务器端被阻止(因为如果我杀死客户端,消息仍然会传递)?
有人知道这种消息屏蔽吗?
服务器:
# ws4py
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
from ws4py import configure_logger
# general
import cherrypy
import logging
import datetime
import json
# settings
host_ip = '127.0.0.1'
host_port = 20000
filepath = '/path/to/server.log'
loglevel = logging.INFO
# logger
logger = configure_logger(stdout=True, filepath=filepath, level=loglevel)
# cherrypy
cherrypy.config.update({'server.socket_host': host_ip,'server.socket_port': host_port})
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()
class BroadcastWebSocketHandler(WebSocket):
def opened(self):
logger.info("BroadcastWebSocketHandler - opened")
def received_message(self, message):
msg = {}
try:
msg = json.loads(str(message))
except Exception as e:
logger.critical(repr(e))
if 'timestamp' in msg:
format_str = '%Y-%m-%d %H:%M:%S.%f'
dt_now = datetime.datetime.utcnow()
dt = datetime.datetime.strptime(msg['timestamp'], format_str)
delta = dt_now - dt
logger.info("BroadcastWebSocketHandler - received message: {} - runtime: {}".format(str(message), str(delta)))
else:
logger.info("BroadcastWebSocketHandler - received message: {}".format(str(message)))
cherrypy.engine.publish('websocket-broadcast', str(message))
class Root(object):
@cherrypy.expose
def index(self):
return 'Text.'
@cherrypy.expose
def ws(self):
handler = cherrypy.request.ws_handler
if __name__ == '__main__':
cherrypy.quickstart(Root(), '/', config={'/ws': {'tools.websocket.on': True,
'tools.websocket.handler_cls': BroadcastWebSocketHandler}})
客户:
# ws4py
from ws4py.client.threadedclient import WebSocketClient, WebSocketBaseClient
from ws4py import format_addresses
# general
import logging
import json
import time
import datetime
import threading
# settings
host_ip = '127.0.0.1'
host_port = 20000
sleeptime = 0.1
n_clients = 60
filename = '/path/to/client.log'
loglevel = logging.INFO
# logging
log_format = '[%(asctime)-15s] %(message)s'
logging.basicConfig(level=loglevel, filename=filename, format=log_format)
# global
keep_sending = True
def send_messages():
client = WebSocketBaseClient('ws://{}:{}/ws'.format(host_ip, host_port))
logging.info('Connecting')
client.connect()
while keep_sending:
time.sleep(sleeptime)
client.send(json.dumps({'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')}))
logging.info('Exiting')
if __name__ == '__main__':
threads = []
for i in range(n_clients):
t = threading.Thread(target=send_messages)
threads.append(t)
t.daemon = True
t.start()
for t in threads:
try:
t.join()
except KeyboardInterrupt:
keep_sending = False
logging.info('KeyboardInterrupt')
2018 年 2 月 7 日更新:
“阻塞行为”的原因是self.sock.sendall()
ws4py 包的 websocket.py 文件中的调用超时。
您可以编辑broadcast()
类WebSocketManager
(manager.py) 的函数来记录异常。
但我仍然不知道为什么这个例子会产生这些超时。我注意到一件事,使用线程客户端WebSocketClient
而不是WebSocketBaseClient
.