我正在构建一个服务器,它使用 Twisted Python 在 Redis 之上存储键/值数据。服务器通过 HTTP 接收 JSON 字典,将其转换为 Python 字典并放入缓冲区。每次存储新数据时,服务器都会安排一个任务,该任务会使用 txredis 客户端从缓冲区中弹出一个字典并将每个元组写入 Redis 实例。
class Datastore(Resource):
isLeaf = True
def __init__(self):
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
d.addCallback(self.setRedis)
self.redis = None
self.buffer = deque()
def render_POST(self, request):
try:
task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
except IndexError:
request.setResponseCode(503)
return '<html><body>Error reading task_id</body></html>'
data = json.loads(request.content.read())
self.buffer.append((task_id, data))
reactor.callLater(0, self.write_on_redis)
return ' '
@defer.inlineCallbacks
def write_on_redis(self):
try:
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
except IndexError:
log.msg('buffer empty')
defer.returnValue(1)
m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
else:
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)
d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)
d.callback(None)
基本上,我面临两个不同连接之间的生产者/消费者问题,但我不确定当前的实现是否在 Twisted 范式中运行良好。我已经阅读了 Twisted 中关于生产者/消费者接口的小文档,但我不确定是否可以在我的案例中使用它们。欢迎任何批评:经过多年的线程并发,我正试图掌握事件驱动编程。