1

Tornado 和 Redis 的新手,并开始实施侦听器/工作人员设置。

我希望能够将 LPUSH 任务放到队列中并将它们 RPOP 关闭。BRPOP 似乎是弹出它们的最佳方式,因为如果当前没有,它将等待添加一个。问题是,每当我使用它时,它永远不会返回......但是当我使用 RPOP 时,我会按预期获得队列中的下一个项目。

class ListenHandler(tornado.websocket.WebSocketHandler):
    uid = ''
    CHANNEL_TPL = "client_%s"
    RESPONSE_TPL = '{"command":"%s","rid":"%s","status":"%s","result":%s}'
    def open(self):
        # new websocket connection is established from a client
        print "open socket"
        self.uid = session = uuid4()        

    def on_message(self, message):
        print "on_message called [%s]" % message
        try:
            m = json.loads(message)
        except ValueError:
            self.write_message('BAD')
            return

        # check for RID (request id)
        if not 'rid' in m:
            self.write_message('error: unspecified rid')
            return

        # confirm receipt of data
        confirm_string = '%s OK' % (m['rid'])
        self.write_message(confirm_string)

        # check for command
        if not 'command' in m:
            response = '%s error: unspecified command' % (m['rid'])
            self.write_message(response)
            return

        # process commands
        if m['command'] == 'register':
            self._register(m['rid'])
        elif m['command'] == 'get_canvas':
            self._queue_command('read', m)
        elif m['command'] == 'save_canvas':
            if 'data' in m:
                self._queue_command('write', m)
            else:
                response = '%s unspecified data' % (m['rid'])
                self.write_message(response)
                return
        elif m['command'] == 'list_command_queue':
            self._list_command_queue(m['rid'])
        elif m['command'] == 'get_read_job':
            self._get_read_job(m['rid'])
        elif m['command'] == 'get_write_job':
            self._get_write_job(m['rid'])
        else:
            # no commands recognized
            response = '%s error: unknown command' % (m['rid'])
            self.write_message(response)
            return
        print "end of on_message()\n"

    def callback(self, data):
        print "- callback()"
        self.write_message(data)

    def on_close(self):
        # websocket connection is closed by client
        pass

    def _register(self, rid):
        data = '{"uid":"%s"}' % (self.uid)
        response = self.RESPONSE_TPL % ('register', rid, 'completed', data)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine 
    def _queue_command(self, type, m):
        channel = self.CHANNEL_TPL % (type)
        print "pushing job to %s ... data[%s]" % (channel, m)
        yield tornado.gen.Task(self.application.rdb.lpush, channel, m)
        return

    def _list_command_queue(self, rid):
        channel_r = self.CHANNEL_TPL  % ('read')
        channel_w = self.CHANNEL_TPL  % ('write')
        data = '{"client_read":"%s","client_write":"%s"}' % (self.application.rdb.llen(channel_r), self.application.rdb.llen(channel_w))
        response = self.RESPONSE_TPL % ('list_command_queue', rid, 'completed', data)
        print "list_command_queue [%s]" % (response)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine  
    def _get_read_job(self, rid):
        channel = self.CHANNEL_TPL % ('read')
        data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
        response = self.RESPONSE_TPL % ('get_read_job', rid, 'completed', data)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine    
    def _get_write_job(self, rid):
        channel = self.CHANNEL_TPL  % ('write')
        data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
        response = self.RESPONSE_TPL % ('get_write_job', rid, 'completed', data)
        self.callback(response) 

上面的类将接受识别的命令并将它们 LPUSH 到两个不同队列之一(一个用于写入 SQL DB 的作业的“写入”队列,以及一个用于只读作业的“读取”队列)。理论上,在不同的机器上会有许多“工作”线程使用 BRPOP 来获取这些命令并执行它们。不过,现在,我正在使用同一个侦听器来测试队列中的内容。

函数 get_write_jobs 和 get_read_jobs 将返回队列中的下一个条目,没有问题。但是,如果我向任何一种方法添加“b”(brpop),该函数将永远不会调用回调。似乎只是锁定,等待下一个可用条目,但那里有条目。

知道这里发生了什么吗?我误解了BRPOP的目的吗?

谢谢,尼克

4

1 回答 1

3

也许有点晚了,但我想我可以帮忙。

我今天也遇到了同样的问题,它几乎让我发疯。最后,经过大量调试后,我通过将密钥或通道传递给列表(例如 [通道])中的 blpop/brpop 解决了它。

tornado-redis 的 brpop/blpop 在内部所做的是将键转换为列表,但是当只接收到一个键时,它会将字符串转换为字符列表(简直太神奇了......),这就是调用之后阻塞的原因,它正在等待各种列表中的新项目,其名称对应于原始密钥的所有字符。

于 2012-08-23T17:32:38.463 回答