1

我正在尝试在 Twisted 中制作一个简单的分布式作业客户端/服务器系统。基本上步骤是:

  1. 使用一些作业和相关文件启动 JobServer
  2. 启动 JobClient 实例,它们连接到 JobServer 并请求 Jobs
  3. 服务器提供 JobClient 作业并通过 TCP 发送序列化 JSON
  4. 经过大量计算后,JobClient 发回结果并等待新作业
  5. 冲洗并重复

但是我无法在本地机器上调试我的协议。

作业服务器.py

from twisted.application import internet, service
from twisted.internet import reactor, protocol, defer
from twisted.protocols import basic
from twisted.protocols.basic import Int32StringReceiver
from twisted.web import client
import random
import json
import base64
from logger import JobLogger

class JobServerProtocol(Int32StringReceiver):

    log = JobLogger("server.log")

    def connectionMade(self):
        self.log.write("Connected to client")
        self.sendJob(None)

    def stringReceived(self, msg):
        self.log.write("Recieved job from client: %s" % msg)
        self.sendJob(msg)

    def sendJob(self, msg):
        d = self.factory.getJob(msg)

        def onError(err):
            self.transport.write("Internal server error")
        d.addErrback(onError)

        def sendString(newjob_dict):
            encoded_str = json.dumps(newjob_dict)
            self.transport.write(encoded_str)
            self.log.write("Sending job to client: %s" % encoded_str)
        d.addCallback(sendString)

    def lengthLimitExceeded(self, msg):
        self.transport.loseConnection()

class JobServerFactory(protocol.ServerFactory):
    protocol = JobServerProtocol

    def __init__(self, jobs, files):
        assert len(jobs) == len(files)
        self.jobs = jobs
        self.files = files
        self.results = []

    def getJob(self, msg):

        # on startup the client will not have a message to send
        if msg:
            # recreate pickled msg
            msg_dict = json.loads(msg)
            self.results.append((msg_dict['result'], msg_dict['jidx']))

        # if we're all done, let the client know
        if len(self.jobs) == 0:
            job = None
            jidx = -1
            encoded = ""
        else:
            # get new job for client to process
            jidx = random.randint(0, len(self.jobs) - 1)
            job = self.jobs[jidx]
            del self.jobs[jidx]

            # get file
            with open(self.files[jidx], 'r') as f:
                filecontents = f.read()
            encoded = base64.b64encode(filecontents)

        # create dict object to send to client
        response_msg = {
            "job" : job,
            "index" : jidx,
            "file" : encoded
        }

        return defer.succeed(response_msg)

# args for factory
files = ['test.txt', 'test.txt', 'test.txt']
jobs = ["4*4-5", "2**2-5", "2/9*2/3"]

application = service.Application('jobservice')
factory = JobServerFactory(jobs=jobs, files=files)
internet.TCPServer(12345, factory).setServiceParent(
    service.IServiceCollection(application))

作业客户端.py

from twisted.internet import reactor, protocol
from twisted.protocols.basic import Int32StringReceiver
import json
import time
from logger import JobLogger

class JobClientProtocol(Int32StringReceiver):

    log = JobLogger("client.log")

    def stringReceived(self, msg):

        # unpack job from server
        server_msg_dict = json.loads(msg)
        job = server_msg_dict["job"]
        index = server_msg_dict["index"]
        filestring = server_msg_dict["file"]

        if index == -1:
            # we're done with all tasks
            self.transport.loseConnection()

        self.log.write("Recieved job %d from server with file '%s'" % (index, filestring))

        # do something with file 
        # job from the server...
        time.sleep(5)
        result = { "a" : 1, "b" : 2, "c" : 3}
        result_msg = { "result" : result, "jidx" : index }

        self.log.write("Completed job %d from server with result '%s'" % (index, result))

        # serialize and tell server
        result_str = json.dumps(result_msg)
        self.transport.write(encoded_str)

    def lengthLimitExceeded(self, msg):
        self.transport.loseConnection()

class JobClientFactory(protocol.ClientFactory):

    def buildProtocol(self, addr):
        p = JobClientProtocol()
        p.factory = self
        return p

reactor.connectTCP("127.0.0.1", 12345, JobClientFactory())
reactor.run()

日志记录.py

class JobLogger(object):
    def __init__(self, filename):
        self.log = open(filename, 'a')

    def write(self, string):
        self.log.write("%s\n" % string)

    def close(self):
        self.log.close()

仅使用一个客户端在本地运行和测试:

$ twistd -y JobServer.py -l ./jobserver.log --pidfile=./jobserver.pid
$ python JobClient.py

我遇到的问题:

  1. 客户端和服务器 .log 文件不会被可靠地写入 - 有时直到我终止进程之后才会写入。
  2. 在客户端连接并且服务器发回消息后,协议卡住了。该消息似乎永远不会到达客户端。

一般来说,我希望这些协议能确保任何一方的操作都可以花费任何时间,但也许我没有正确设计。

4

1 回答 1

3

客户端和服务器 .log 文件不会被可靠地写入 - 有时直到我终止进程之后才会写入。

如果您希望字节及时出现在磁盘上,您可能需要调用flush您的文件对象。

在客户端连接并且服务器发回消息后,协议卡住了。该消息似乎永远不会到达客户端。

服务器不向客户端发送 int32 字符串:它transport.write直接调用。客户感到困惑,因为这些最终看起来像极长的 int32 字符串。例如,“Internal server error”的前四个字节解码为整数 1702129225,因此如果服务器上出现错误并将这些字节发送到客户端,客户端将等待大约 2GB 的数据后再继续。

改为使用Int32StringReceiver.sendString

于 2013-10-07T20:45:19.910 回答