3

我有以下设置:

在 Linux x64 下使用 Python 3.4 运行的是带有 ws4py-0.4.2 广播处理程序的 CherryPy-10.2.1 网络服务器,它将打印接收到的消息(包含时间戳)和该消息的运行时间。

然后有 60 个客户端线程(我也使用 10、20、50 和 70 个线程,结果相似)将使用 ws4py 连接到服务器。他们将每 0.1 秒发送一次时间戳消息。

在服务器日志文件中大约 48500 行之后,消息只会在 10 秒的间隔后到达。但是客户端线程继续以原始速度发送。似乎消息已发送,被缓冲并且仅在 10 秒后才释放。

如果我终止客户端线程,消息不会丢失,但所有被阻止的消息都会被释放并出现在广播处理程序中(使用它们各自的运行时,取决于你让线程运行多长时间。如果你让客户端线程发送那么长的时间。)

如果我关闭sock客户端并在 850 次发送后再次连接(这将是在 51000 条消息之后),则消息首先会被阻止,但由于重新连接,这些消息很快就会“刷新”并且消息出现在服务器日志中。

这些消息在哪里被阻止?ws4pysend()函数正在使用socket.sendall(),这个发送有问题吗?或者消息是否在服务器端被阻止(因为如果我杀死客户端,消息仍然会传递)?

有人知道这种消息屏蔽吗?

服务器:

# ws4py
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
from ws4py import configure_logger

# general
import cherrypy
import logging
import datetime
import json

# settings
host_ip = '127.0.0.1'
host_port = 20000
filepath = '/path/to/server.log'
loglevel = logging.INFO

# logger
logger = configure_logger(stdout=True, filepath=filepath, level=loglevel)

# cherrypy
cherrypy.config.update({'server.socket_host': host_ip,'server.socket_port': host_port})
WebSocketPlugin(cherrypy.engine).subscribe()
cherrypy.tools.websocket = WebSocketTool()


class BroadcastWebSocketHandler(WebSocket):

    def opened(self):
        logger.info("BroadcastWebSocketHandler - opened")

    def received_message(self, message):
        msg = {}
        try:
            msg = json.loads(str(message))
        except Exception as e:
            logger.critical(repr(e))

        if 'timestamp' in msg:
            format_str = '%Y-%m-%d %H:%M:%S.%f'
            dt_now = datetime.datetime.utcnow()
            dt = datetime.datetime.strptime(msg['timestamp'], format_str)
            delta = dt_now - dt
            logger.info("BroadcastWebSocketHandler - received message: {} - runtime: {}".format(str(message), str(delta)))
        else:
            logger.info("BroadcastWebSocketHandler - received message: {}".format(str(message)))

        cherrypy.engine.publish('websocket-broadcast', str(message))


class Root(object):

    @cherrypy.expose
    def index(self):
        return 'Text.'

    @cherrypy.expose
    def ws(self):
        handler = cherrypy.request.ws_handler


if __name__ == '__main__':
    cherrypy.quickstart(Root(), '/', config={'/ws': {'tools.websocket.on': True,
                                                     'tools.websocket.handler_cls': BroadcastWebSocketHandler}})

客户:

# ws4py
from ws4py.client.threadedclient import WebSocketClient, WebSocketBaseClient
from ws4py import format_addresses

# general
import logging
import json
import time
import datetime
import threading

# settings
host_ip = '127.0.0.1'
host_port = 20000
sleeptime = 0.1
n_clients = 60
filename = '/path/to/client.log'
loglevel = logging.INFO

# logging
log_format = '[%(asctime)-15s] %(message)s'
logging.basicConfig(level=loglevel, filename=filename, format=log_format)

# global
keep_sending = True


def send_messages():
    client = WebSocketBaseClient('ws://{}:{}/ws'.format(host_ip, host_port))
    logging.info('Connecting')
    client.connect()
    while keep_sending:
        time.sleep(sleeptime)
        client.send(json.dumps({'timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')}))
    logging.info('Exiting')


if __name__ == '__main__':

    threads = []
    for i in range(n_clients):
        t = threading.Thread(target=send_messages)
        threads.append(t)
        t.daemon = True
        t.start()

    for t in threads:
        try:
            t.join()
        except KeyboardInterrupt:
            keep_sending = False
            logging.info('KeyboardInterrupt')

2018 年 2 月 7 日更新:

“阻塞行为”的原因是self.sock.sendall()ws4py 包的 websocket.py 文件中的调用超时。

您可以编辑broadcast()WebSocketManager(manager.py) 的函数来记录异常。

但我仍然不知道为什么这个例子会产生这些超时。我注意到一件事,使用线程客户端WebSocketClient而不是WebSocketBaseClient.

4

0 回答 0