0

我正在尝试将 Pycurl 与 Gevent 一起使用以执行 HTTP 上传。为此,我依赖geventcurl.py模块,它改变了 libcurl 的Multi API 以使用 Gevent 的事件循环。

问题在于READFUNCTION回调。此回调在HUB上下文中执行,因此我们不能wait(),但此回调必须返回要上传的数据,在我的情况下,此数据来自阻塞源。

这是演示该问题的代码片段:

#!/usr/bin/env python
from gevent import monkey; monkey.patch_all()
import gevent
from gevent.queue import Queue
import pycurl
from geventcurl import Curl

URL = 'http://localhost:8000/'

class QueueReader:
    def __init__(self, q):
        self.q = q
    def read_callback(self, size):
        return self.q.get(timeout=10)

dataq = Queue(10)
c = Curl()
c.setopt(pycurl.URL, URL)
c.setopt(pycurl.UPLOAD, 1)
c.setopt(pycurl.READFUNCTION, QueueReader(dataq).read_callback)

# Start transfer
g = gevent.spawn(c.perform)
for i in xrange(10):
    dataq.put(str(i))
    gevent.sleep(1)
g.join()
c.close()

要运行代码片段,您只需要在 localhost:8000 上进行监听即可nc -l 8000。发生的情况是,由于read_callback()在 HUB 上下文中执行,它不会等待,如果队列为空,它将立即引发 Empy 异常。使用 AsyncResult 也无济于事,因为我们必须等待()才能得到结果。

有没有办法在事件循环回调中从可能阻塞的源中获取数据?

4

1 回答 1

0

如果您摆脱timeout=10,它将等待来自阻塞源的数据。(参见:http ://www.gevent.org/gevent.queue.html ,默认行为。)

如果 read_callback() 的阻塞会干扰循环的操作for i in xrange(10),则将该循环通过例如gevent.spawn().

另请注意,put()如果队列已满,默认情况下将阻塞。考虑put_nowait(), put(timeout=...), 或put(block=False)更改默认行为。

于 2012-10-03T16:27:18.743 回答