3

我正在构建一个服务器,它使用 Twisted Python 在 Redis 之上存储键/值数据。服务器通过 HTTP 接收 JSON 字典,将其转换为 Python 字典并放入缓冲区。每次存储新数据时,服务器都会安排一个任务,该任务会使用 txredis 客户端从缓冲区中弹出一个字典并将每个元组写入 Redis 实例。

class Datastore(Resource):

isLeaf = True

def __init__(self):
    self.clientCreator = protocol.ClientCreator(reactor, Redis)
    d = self.clientCreator.connectTCP(...)
    d.addCallback(self.setRedis)
    self.redis = None
    self.buffer = deque()


def render_POST(self, request):
    try:
        task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
    except IndexError:
        request.setResponseCode(503)
        return '<html><body>Error reading task_id</body></html>'  

    data = json.loads(request.content.read())
    self.buffer.append((task_id, data))
    reactor.callLater(0, self.write_on_redis)
    return ' '

@defer.inlineCallbacks 
def write_on_redis(self):
    try:
        task_id, dic = self.buffer.pop()
        log.msg('Buffer: %s' % len(self.buffer))
    except IndexError:
        log.msg('buffer empty')
        defer.returnValue(1)

    m = yield self.redis.sismember('DONE', task_id)
    # Simple check
    if m == '1':
        log.msg('%s already stored' % task_id)
    else:
        log.msg('%s unpacking' % task_id)
        s = yield self.redis.sadd('DONE', task_id)

        d = defer.Deferred()
        for k, v in dic.iteritems():
            k = k.encode()
            d.addCallback(self.redis.push, k, v)

        d.callback(None)

基本上,我面临两个不同连接之间的生产者/消费者问题,但我不确定当前的实现是否在 Twisted 范式中运行良好。我已经阅读了 Twisted 中关于生产者/消费者接口的小文档,但我不确定是否可以在我的案例中使用它们。欢迎任何批评:经过多年的线程并发,我正试图掌握事件驱动编程。

4

1 回答 1

3

IProducerTwisted和中的生产者和消费者 APIIConsumer是关于流控制的。您在这里似乎没有任何流量控制,您只是将消息从一个协议中继到另一个协议。

由于没有流量控制,缓冲区只是额外的复杂性。您可以通过将数据直接传递给方法来摆脱它write_on_redis。这种方式write_on_redis不需要处理空缓冲区的情况,也不需要资源上的额外属性,甚至可以摆脱callLater(尽管即使保留缓冲区也可以这样做)。

不过,我不知道这是否能回答你的问题。至于这种方法是否“运作良好”,以下是我通过阅读代码注意到的事情:

  • 如果数据到达的速度比 redis 接受的快,你的未完成作业列表可能会变得任意大,导致你的内存不足。这就是流量控制的帮助。
  • 如果没有围绕sismember调用或sadd调用进行错误处理,如果其中任何一个失败,您可能会丢失任务,因为您已经将它们从工作缓冲区中弹出。
  • 将推送作为回调Deferred d也意味着任何失败的推送都将阻止其余数据被推送。它还将Deferred返回的结果push (我假设它返回 a Deferred)作为第一个参数传递给下一次调用,因此除非push或多或少忽略它的第一个参数,否则您不会将正确的数据推送到 redis。

如果要实现流控制,则需要让 HTTP 服务器检查新任务的长度self.buffer并可能拒绝新任务 -不要将其添加到self.buffer客户端并将一些错误代码返回给客户端。你仍然不会使用IConsumerand IProducer,但它有点相似。

于 2011-03-02T00:45:12.813 回答