1

我有一个 TCP 服务器运行在扭曲的、监听的 tcp 客户端上。我还有一个 Django 应用程序,它通过 Twisted 服务器上的 AMP 向 TCP 客户端发送消息。

从 Djsngo App 发送的订单,等待 AMP 响应,等待 TCP 客户端响应。

我的问题是关于发送的多个订单,然后 AMP 服务器作为延迟消息可能会在 TCP 客户端连接中断时交叉消息,然后我需要一种方法来管理这个订单作为一个池或其他方式来解决这个收费连接

我的 AMP 服务器是

"""Amp Server """
from twisted.protocols.amp import AMP, Command, String

class AmpProcessor(Command):
    arguments = [('proto', String()),
                 ('imei', String()),
                 ('ip', String()),
                 ('port', String()),
                 ('cmmd', String()),
                 ('mssg', String())]
    response = [('answer', String())]

class AMPServer(AMP):

    @AmpProcessor.responder
    def processor(self, proto, imei, ip, port, cmmd, mssg):
        try:
            response = self.factories[proto].clients[ip].runCommand(cmmd, mssg)
            return {'answer': str(response)}
        except:
            return {'answer': str('FAIL')}

我的服务器所做的是,当我收到连接时,得到的参数是

  • 原型:存储了硬件 TCP 客户端的索引
  • Imei:客户端标识符
  • ip:客户端连接IP
  • 端口:端口连接
  • Cmmd:order,是一个会被执行的方法
  • Mssg:将发送给客户端的消息

收到连接后,AMP 检查其存储客户端连接的数组并使用实例发送订单

然后我正在寻找管理连接,就像一个没有崩溃客户端连接的池

我的 TCP 服务器

from twisted.internet import threads
from twisted.internet.protocol import Factory, Protocol

class TrackerServer(Protocol):
    """Tracker Twisted Protocol Class."""

    def __init__(self, clients, devicer):
        self.clients = clients
        self.devicer = devicer
        self.decoder = None
        self.host = None
        self.peer = None
        self.state = False

    def connectionMade(self):
        """ConnectionMade Twisted event."""
        try:
            decoderModule = __import__('listener.protocols.%sDecoder' % (self.devicer, ), fromlist=['%sDecoder' % (self.devicer, )])
            decoderClass = getattr(decoderModule, '%sDecoder' % (self.devicer, ))
            self.decoder = decoderClass()
            self.peer = self.transport.getPeer()
            self.host = self.transport.getHost()
            self.decoder.openConnection(self.host.host, self.host.port, self.peer.host, self.peer.port)
            print 'Connection made to', self.host, 'from', self.peer
            self.clients[self.peer.host] =  self
        except ValueError:
            print "Oops!  Connection was not started"

    def connectionLost(self, reason):
        """ConnectionLost Twisted event."""
        if self.clients.has_key(self.peer.host):
            del self.clients[self.peer.host]
            self.decoder.closeConnection()
            print "Connection lost from", self.peer.host, ':', reason
        else:
            print "Connection unknown peer:", reason

    def dataReceived(self, data):
        """DataReceived Twisted event."""
        #try:
        """ how to precess here a line for a specific client"""
        response = self.decoder.processDatagram(data)
        if  response != False:
            self.sendMessage(response)
            #d = threads.deferToThread(self.factory.decoder.processDatagram(data ))
            #d.addCallback(self.sendResponse)
        #except ValueError:
        #    print "Oops!  That was no valid data.  Try again..."

    def sendMessage (self, response):
        self.transport.write(response)

class TrackerFactory(Factory):

    def __init__(self, devicer):
        self.clients = {}
        self.devicer = devicer

    def buildProtocol(self, addr):
        proto = TrackerServer(self.clients, self.devicer)
        self.connectedProtocol = proto
        return proto

我的 *.tac 文件

import os, sys
import ConfigParser
from twisted.application import internet, service
from twisted.internet import protocol, reactor
from listener.TrackerServer import TrackerFactory
from listener.AMPServer import AMPServer
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.internet import StreamServerEndpointService

PROJECT_DIR = os.path.abspath(os.path.dirname(__file__))
sys.path.append(PROJECT_DIR)

path = None
config = ConfigParser.ConfigParser()
config.read('protocols.cfg')

application = service.Application("tracker")

factories = {}

for device in config.get('protocols', 'keys').split(','):
    devicer = config.get(device, 'name')
    factories[devicer] = TrackerFactory(devicer)
    internet.TCPServer(int(config.get(device, 'port')), factories[devicer]).setServiceParent(application)

endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = AMPServer
factory.protocol.factories = factories
ampService = StreamServerEndpointService(endpoint, factory)
ampService.setServiceParent(application)
4

0 回答 0