2

我正在运行该massconnect.py模块以模拟打开许多并行 web-socket 连接,但是当我尝试打开 15000 个连接(在 massconnect json 中指定)一段时间后,我收到以下错误消息:

Unhandled Error
Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 88, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 73, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\reactor.py", line 120, in _callEventCallback
    evt.callback(rc, bytes, evt)
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\tcp.py", line 285, in cbConnect
    self.socket.setsockopt(
exceptions.AttributeError: 'Client' object has no attribute 'socket'

web-socket 连接的打开被中断,我得到了这个异常。

我调整了我的massconnect.py模块以确认应用程序的 web-socket 身份验证。我正在测试,以便每次分配客户端时都会发送唯一的身份验证消息。

我已经MaxUserPort在注册表中设置为 65534。我认为这个问题可能会导致我的内存不足,但在用不同的机器检查后,我意识到这不是原因。

这可能是一个已知的 python/twisted 问题吗?

@字形:

这是 massconnect.py 源代码:

'''
Created on Jan 29, 2013

@author: boro.petrovic
'''
###############################################################################
##
##  Copyright 2011,2012 Tavendo GmbH
##
##  Licensed under the Apache License, Version 2.0 (the "License");
##  you may not use this file except in compliance with the License.
##  You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
##  Unless required by applicable law or agreed to in writing, software
##  distributed under the License is distributed on an "AS IS" BASIS,
##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
##  See the License for the specific language governing permissions and
##  limitations under the License.
##
###############################################################################

import sys
import time
from twisted.internet import reactor
from twisted.internet import error
from twisted.internet.defer import Deferred, returnValue, inlineCallbacks
from twisted.internet.interfaces import IReactorTime
from autobahn.websocket import connectWS
from autobahn.websocket import WebSocketClientFactory, WebSocketClientProtocol
import time

f='start'

class MassConnectProtocol(WebSocketClientProtocol):   

    def sendHello(self, message):
        #self.sendMessage("Hello from Python!")
        self.sendMessage(payload=message)
        #reactor.callLater(2, self.sendHello(message='test'))

    def onOpen(self):
        global f
        wstoken = f.readline().rstrip()
        message = 'messagecontent'+wstoken
        self.factory.test.onConnected()
        self.sendHello(message)
        self.wstoken = wstoken

    def onMessage(self, msg, binary):
        print "Got message from wstoken " + self.wstoken + " :" + msg

class MassConnectFactory(WebSocketClientFactory):

    def clientConnectionFailed(self, connector, reason):
        if self.test.onFailed():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

    def clientConnectionLost(self, connector, reason):
        if self.test.onLost():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

class MassConnect:

    def __init__(self, name, uri, connections, batchsize, batchdelay, retrydelay):
        self.name = name
        self.uri = uri
        self.batchsize = batchsize
        self.batchdelay = batchdelay
        self.retrydelay = retrydelay
        self.failed = 0
        self.lost = 0
        self.targetCnt = connections
        self.currentCnt = 0
        self.actual = 0

    def run(self):
        self.d = Deferred()
        self.started = time.clock()
        self.connectBunch()
        return self.d

    def onFailed(self):
        self.failed += 1
        return True

    def onLost(self):
        self.lost += 1
        return True

    def onConnected(self):
        #sprint "connect"
        self.actual += 1
        if self.actual % self.batchsize == 0:
            sys.stdout.write(".")
        if self.actual == self.targetCnt:
            self.ended = time.clock()
            duration = self.ended - self.started
            print " connected %d clients to %s at %s in %s seconds (retries %d = failed %d + lost %d)" % (self.currentCnt, self.name, self.uri, duration, self.failed + self.lost, self.failed, self.lost)
            try:
                reactor.run()
            except twisted.internet.error.ReactorAlreadyRunning:
                pass

            result = {'name': self.name,
                  'uri': self.uri,
                  'connections': self.targetCnt,
                  'retries': self.failed + self.lost,
                  'lost': self.lost,
                  'failed': self.failed,
                  'duration': duration}
            self.d.callback(result)

    def connectBunch(self):

        if self.currentCnt + self.batchsize < self.targetCnt:
            c = self.batchsize
            redo = True
        else:
            c = self.targetCnt - self.currentCnt
            redo = False
        for i in xrange(0, c):
            factory = MassConnectFactory(self.uri, origin=None, protocols=[("myprotocol")])
            factory.test = self
            factory.retrydelay = self.retrydelay
            factory.protocol = MassConnectProtocol
            factory.setProtocolOptions(version=13)
            factory.setSessionParameters(url="myurl", origin=None, protocols=["myprotocol"])
            connectWS(factory)
            self.currentCnt += 1
        if redo:
            reactor.callLater(float(self.batchdelay)/1000., self.connectBunch)

class MassConnectTest:

    def __init__(self, spec):
        global f
        f = open("C:/tokens.txt", "r")
        self.spec = spec

    @inlineCallbacks
    def run(self):
        global f
        res = []
        for s in self.spec['servers']:
            t = MassConnect(s['name'],
                            s['uri'],
                            self.spec['options']['connections'],
                            self.spec['options']['batchsize'],
                            self.spec['options']['batchdelay'],
                            self.spec['options']['retrydelay'])
            r = yield t.run()
            res.append(r)
        returnValue(res)
        f.close()

在脚本中,我用虚拟字符串替换了机密数据。

从 token.txt 文件中,我正在读取我通过 web-socket 连接到被测应用程序的 15000 个用户中的每个用户的身份验证令牌。通过这种方式,我模拟了许多不同用户的连接,并且对于每个用户,我都有活动的独立 Web 套接字连接。

不幸的是,我没有达到建立所有 15000 个连接的地步,因为在运行脚本(建立连接)时,我收到了未处理的错误消息。

4

1 回答 1

0

如果您减少massconnect.pySSCCE会很有帮助。但是,如果问题仅在数千个并发连接的级别出现,您可能会用完文件描述符(抱歉,Twisted 的错误消息不是很好)。

有关在操作系统中更改此参数的一些信息,请参阅此问题。

于 2015-10-26T22:10:52.393 回答