4

我在 QT 应用程序中使用 requests 模块中的生成器函数,与请求流示例中的几乎相同:

import json
import requests

def get_stream():
    r = requests.get('http://httpbin.org/stream/20', stream=True)
    for line in r.iter_lines():
        if line:
            yield json.loads(line)

def consume_stream():
    for message in get_stream():
       #do something

但是,当没有传入响应时(从 Twitters Streaming API 不规则地传入推文),生成器get_stream将阻塞该consume_stream方法。

这可能发生在生成器没有立即产生,但必须等待传入消息等的任何情况下,因此会阻塞消费者。

Python中是否有任何模式可以以非阻塞方式使用生成器,即如果生成器产生,则处理它的结果,否则在下一个结果出现之前做其他事情?

4

4 回答 4

2

看看生产者-消费者模式。它通常在 python 中使用Queue.

生产者,通常在一个线程或另一个进程中运行(Queue都支持),只是将消息放入队列中。消费者,只要有感觉,就会从队列中弹出消息。此操作支持timeout参数。

于 2013-11-01T22:41:20.427 回答
1

正如西蒙在评论中所问的那样,它不能像您在示例中描述的那样简单。有很多细节需要注意。根据您的用例,有不同的解决方案或多或少有意义。你没有提供太多关于你真正想做的事情的细节,所以我只是将你发送到http://twistedmatrix.com/trac/wiki/QTReactor作为示例。有实现异步消息队列的不同解决方案/框架。我认为,这就是你要找的。

于 2013-11-01T22:45:56.303 回答
1

如果您控制生成器功能,一种解决方案是让它在超时后抛出异常。也许是这样的:

def get_stream(timeout=None):
    while message=read_message(timeout=timout):
        yield message

如果发生超时条件,则让 read_message 抛出 TimeOutException 或其他东西。

当然,您仍然必须处理何时/如何重试/恢复的后勤问题。

于 2013-11-02T02:54:09.343 回答
0

您可以从 python 3.6 https://www.python.org/dev/peps/pep-0525/开始使用异步生成器

import json
import requests

async def get_stream():
    r = requests.get('http://httpbin.org/stream/20', stream=True)
    for line in r.iter_lines():
        if line:
            yield json.loads(line)

async def consume_stream():
   await for message in get_stream():
       #do something
于 2020-11-13T20:39:17.023 回答