1

我正在学习如何使用 Twisted AMP。我正在开发一个程序,将数据从客户端发送到服务器并将数据插入 SQLite3 DB。然后服务器将结果发送回客户端,指示成功或错误(尝试和除外可能不是最好的方法,但它只是我解决主要问题时的临时解决方案)。为了做到这一点,我修改了一个示例,该示例最初进行了求和并返回了结果,因此我意识到这可能不是执行我想做的最有效的方法。特别是我正在尝试对多次插入进行一些计时(即多次将数据发送到服务器以进行多次插入)并且我已经包含了我编写的代码。

我已经尝试了几种方法来解决这个问题,包括将 ClientCreator 传递给 reactor.callWhenRunning() 但你不能通过延迟来做到这一点。

任何有关如何执行此操作的建议、建议或帮助将不胜感激。这是代码。

服务器:

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
import sqlite3, time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class Protocol(amp.AMP):
    def __init__(self):     
       self.conn = sqlite3.connect('biomed1.db')
       self.c =self.conn.cursor()
       self.res=None

    @Insert.responder
    def dbInsert(self, data):
        self.InsertDB(data) #call the DB inserter
        result=self.res     # send back the result of the insertion
        return {'insert_result': result}

    def InsertDB(self,data):
      tm=time.time()
      print "insert time:",tm
      chx=data
      PID=2
      device_ID=5
      try:
        self.c.execute("INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES ('%s','%s','%s')" % (chx, PID, device_ID))    
      except Exception, err:
             print err
             self.res=0
      else:
             self.res=1

      self.conn.commit()


pf = Factory()
pf.protocol = Protocol
reactor.listenTCP(1234, pf) 
reactor.run()

客户:

from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.protocols import amp
import time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

def connected(protocol):
    return protocol.callRemote(Insert, data=5555).addCallback(gotResult)

def gotResult(result):
    print 'insert_result:', result['insert_result']
    tm=time.time()
    print "stop", tm    

def error(reason):
    print "error", reason

tm=time.time()
print "start",tm
for i in range (10): #send data over ten times
  ClientCreator(reactor, amp.AMP).connectTCP(
     '127.0.0.1', 1234).addCallback(connected).addErrback(error)

reactor.run()

代码结束。

谢谢你。

4

1 回答 1

4

很少有东西可以改善您的服务器代码。

首先也是最重要的:在twisted 中不鼓励使用直接数据库访问函数,因为它们通常会导致阻塞。Twisted 对数据库访问有很好的抽象,它为数据库连接提供了扭曲的方法 - twisted.adbapi

现在重用 db 连接:如果您想跨多个协议实例重用某些资产(如数据库连接),您应该在 Factory 的构造函数中初始化它们,或者如果您不希望在启动时启动这些东西,创建一个资源访问方法,它将在第一次方法调用时启动资源,然后将其分配给类变量并在后续调用时返回。

当 Factory 创建特定的 Protocol 实例时,它会在协议内部添加对自身的引用,参见twisted.internet.protocol 的第 97 行

然后在您的协议实例中,您可以访问共享数据库连接实例,例如:

self.factory.whatever_name_for_db_connection.doSomething() 

重新设计的服务器代码(我没有可用的 python、twisted 甚至像样的 IDE,所以这几乎没有经过测试,一些错误是可以预料的)

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
import time

class AMPDBAccessProtocolFactory(Factory):
    def getDBConnection(self):
        if 'dbConnection' in dir(self):
            return self.dbConnection
        else:
            self.dbConnection = SQLLiteTestConnection(self.dbURL)
            return self.dbConnection

class SQLLiteTestConnection(object):
    """
    Provides abstraction for database access and some business functions.
    """
    def __init__(self,dbURL):
        self.dbPool =  adbapi.ConnectionPool("sqlite3" , dbURL,  check_same_thread=False)

    def insertBTData4(self,data):
        query = "INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES (%s,%s,%s)" 
        tm=time.time()
        print "insert time:",tm
        chx=data
        PID=2
        device_ID=5
        dF = self.dbPool.runQuery(query,(chx, PID, device_ID)) 
        dF.addCallback(self.onQuerySuccess,insert_data=data)
        return dF
    def onQuerySuccess(self,insert_data,*r):
        """
        Here you can inspect query results or add any other valuable information to be parsed at client.
        For the test sake we will just return True to a customer if query was a success.
        original data available at kw argument insert_data
        """
        return True


class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class MyAMPProtocol(amp.AMP):

    @Insert.responder
    def dbInsert(self, data):
        db = self.factory.getDBConnection()
        dF = db.insertBTData4(data)
        dF.addErrback(self.onInsertError,data)
        return dF

    def onInsertError(self, error, data):
        """
        Here you could do some additional error checking or inspect data 
        which was handed for insert here. For now we will just throw the same exception again
        so that the client gets notified
        """
        raise error

if __name__=='__main__':
    pf = AMPDBAccessProtocolFactory()
    pf.protocol = MyAMPProtocol
    pf.dbURL='biomed1.db'
    reactor.listenTCP(1234, pf) 
    reactor.run()

现在到客户端。如果 AMP 遵循整体 RPC 逻辑(目前无法对其进行测试),它应该能够在多个调用中细读相同的连接。所以我创建了一个 ServerProxy 类,它将保存该可阅读的协议实例并为调用提供抽象:

from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.protocols import amp
import time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class ServerProxy(object):
    def connected(self,protocol):
        self.serverProxy = protocol # assign protocol as instance variable
        reactor.callLater(5,self.startMultipleInsert) #after five seconds start multiple insert procedure

    def remote_insert(self,data):
        return self.serverProxy.callRemote(Insert, data)

    def startMultipleInsert(self):
        for i in range (10): #send data over ten times
            dF = self.remote_insert(i)
            dF.addCallback(self.gotInsertResult)
            dF.addErrback(error)

    def gotInsertResult(self,result):
        print 'insert_result:', str(result)
        tm=time.time()
        print "stop", tm    

def error(reason):
    print "error", reason


def main():
    tm=time.time()
    print "start",tm
    serverProxy = ServerProxy()
    ClientCreator(reactor, amp.AMP).connectTCP('127.0.0.1', 1234).addCallback(serverProxy.connected).addErrback(error)
    reactor.run()    

if __name__=='__main__':
    main()
于 2012-04-25T20:31:11.670 回答