我正在尝试在 Twisted 中制作一个简单的分布式作业客户端/服务器系统。基本上步骤是:
- 使用一些作业和相关文件启动 JobServer
- 启动 JobClient 实例,它们连接到 JobServer 并请求 Jobs
- 服务器提供 JobClient 作业并通过 TCP 发送序列化 JSON
- 经过大量计算后,JobClient 发回结果并等待新作业
- 冲洗并重复
但是我无法在本地机器上调试我的协议。
作业服务器.py
from twisted.application import internet, service
from twisted.internet import reactor, protocol, defer
from twisted.protocols import basic
from twisted.protocols.basic import Int32StringReceiver
from twisted.web import client
import random
import json
import base64
from logger import JobLogger
class JobServerProtocol(Int32StringReceiver):
log = JobLogger("server.log")
def connectionMade(self):
self.log.write("Connected to client")
self.sendJob(None)
def stringReceived(self, msg):
self.log.write("Recieved job from client: %s" % msg)
self.sendJob(msg)
def sendJob(self, msg):
d = self.factory.getJob(msg)
def onError(err):
self.transport.write("Internal server error")
d.addErrback(onError)
def sendString(newjob_dict):
encoded_str = json.dumps(newjob_dict)
self.transport.write(encoded_str)
self.log.write("Sending job to client: %s" % encoded_str)
d.addCallback(sendString)
def lengthLimitExceeded(self, msg):
self.transport.loseConnection()
class JobServerFactory(protocol.ServerFactory):
protocol = JobServerProtocol
def __init__(self, jobs, files):
assert len(jobs) == len(files)
self.jobs = jobs
self.files = files
self.results = []
def getJob(self, msg):
# on startup the client will not have a message to send
if msg:
# recreate pickled msg
msg_dict = json.loads(msg)
self.results.append((msg_dict['result'], msg_dict['jidx']))
# if we're all done, let the client know
if len(self.jobs) == 0:
job = None
jidx = -1
encoded = ""
else:
# get new job for client to process
jidx = random.randint(0, len(self.jobs) - 1)
job = self.jobs[jidx]
del self.jobs[jidx]
# get file
with open(self.files[jidx], 'r') as f:
filecontents = f.read()
encoded = base64.b64encode(filecontents)
# create dict object to send to client
response_msg = {
"job" : job,
"index" : jidx,
"file" : encoded
}
return defer.succeed(response_msg)
# args for factory
files = ['test.txt', 'test.txt', 'test.txt']
jobs = ["4*4-5", "2**2-5", "2/9*2/3"]
application = service.Application('jobservice')
factory = JobServerFactory(jobs=jobs, files=files)
internet.TCPServer(12345, factory).setServiceParent(
service.IServiceCollection(application))
作业客户端.py
from twisted.internet import reactor, protocol
from twisted.protocols.basic import Int32StringReceiver
import json
import time
from logger import JobLogger
class JobClientProtocol(Int32StringReceiver):
log = JobLogger("client.log")
def stringReceived(self, msg):
# unpack job from server
server_msg_dict = json.loads(msg)
job = server_msg_dict["job"]
index = server_msg_dict["index"]
filestring = server_msg_dict["file"]
if index == -1:
# we're done with all tasks
self.transport.loseConnection()
self.log.write("Recieved job %d from server with file '%s'" % (index, filestring))
# do something with file
# job from the server...
time.sleep(5)
result = { "a" : 1, "b" : 2, "c" : 3}
result_msg = { "result" : result, "jidx" : index }
self.log.write("Completed job %d from server with result '%s'" % (index, result))
# serialize and tell server
result_str = json.dumps(result_msg)
self.transport.write(encoded_str)
def lengthLimitExceeded(self, msg):
self.transport.loseConnection()
class JobClientFactory(protocol.ClientFactory):
def buildProtocol(self, addr):
p = JobClientProtocol()
p.factory = self
return p
reactor.connectTCP("127.0.0.1", 12345, JobClientFactory())
reactor.run()
日志记录.py
class JobLogger(object):
def __init__(self, filename):
self.log = open(filename, 'a')
def write(self, string):
self.log.write("%s\n" % string)
def close(self):
self.log.close()
仅使用一个客户端在本地运行和测试:
$ twistd -y JobServer.py -l ./jobserver.log --pidfile=./jobserver.pid
$ python JobClient.py
我遇到的问题:
- 客户端和服务器 .log 文件不会被可靠地写入 - 有时直到我终止进程之后才会写入。
- 在客户端连接并且服务器发回消息后,协议卡住了。该消息似乎永远不会到达客户端。
一般来说,我希望这些协议能确保任何一方的操作都可以花费任何时间,但也许我没有正确设计。