7

我尝试使用 txredis(用于 redis 的非阻塞扭曲 api)作为我正在尝试使用我正在处理的 scrapy 项目设置的持久消息队列,但没有成功。我发现虽然客户端没有阻塞,但它变得比原本应该慢得多,因为反应器循环中的一个事件被分成了数千个步骤。

因此,我尝试使用 redis-py(常规阻塞扭曲 api)并将调用包装在延迟线程中。它工作得很好,但是我想在调用 redis 时执行内部延迟,因为我想设置连接池以尝试进一步加快速度。

下面是我对延迟线程的扭曲文档中的一些示例代码的解释,以说明我的用例:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

这是我对连接池的更改,它使延迟线程中的代码阻塞:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

所以我的问题是,有谁知道为什么我的更改会导致延迟线程阻塞和/或任何人都可以提出更好的解决方案?

4

3 回答 3

12

好吧,正如扭曲的文档所说:

延迟不会使代码神奇地不阻塞

每当您使用阻塞代码时,例如sleep,您都必须将其推迟到一个新线程。

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

如果redis api不是很复杂,使用twisted.web重写它可能更自然,而不是仅仅在很多线程中调用阻塞api。

于 2010-03-17T22:06:48.320 回答
3

还有一个用于 twisted 的最新 Redis 客户端,它已经支持 Redis 2.x 的新协议和特性。你一定要试一试。它被称为 txredisapi。

对于持久消息队列,我建议使用 RestMQ。建立在 cyclone 和 txredisapi 之上的基于 redis 的消息队列系统。

http://github.com/gleicon/restmq

干杯

于 2010-09-10T16:58:19.920 回答
0

在相关的说明中,使用专门为 Twisted 创建的 Redis 客户端可能会收获很多,例如:http: //github.com/deldotdr/txRedis

于 2010-03-18T18:27:38.960 回答