10

这是一些说明我的问题的代码:

def blocking1():
    while True:
        yield 'first blocking function example'

def blocking2():
    while True:
        yield 'second blocking function example'

for i in blocking1():
    print 'this will be shown'

for i in blocking2():
    print 'this will not be shown'

我有两个包含while True循环的函数。这些将产生数据,然后我将记录在某个地方(很可能是一个 sqlite 数据库)。

我一直在玩线程并让它工作。但是,我真的不喜欢它......我想做的是让我的阻塞函数异步。就像是:

def blocking1(callback):
    while True:
        callback('first blocking function example')

def blocking2(callback):
    while True:
        callback('second blocking function example')

def log(data):
    print data

blocking1(log)
blocking2(log)

如何在 Python 中实现这一点?我已经看到标准库带有 asyncore,并且这个游戏中的大牌是 Twisted,但这两个似乎都用于套接字 IO。

如何异步我的非套接字相关的阻塞函数?

4

5 回答 5

30

阻塞函数是一个不返回但仍使您的进程处于空闲状态的函数 - 无法完成更多工作。

您要求我们将您的阻塞功能设为非阻塞。然而——除非你正在编写一个操作系统——没有任何阻塞函数。您可能具有阻塞的函数,因为它们调用阻塞系统调用,或者您可能具有“阻塞”的函数,因为它们进行大量计算。

如果不使底层系统调用非阻塞,就不可能使前一种类型的函数成为非阻塞。根据该系统调用是什么,如果不向程序添加事件循环,可能很难使其成为非阻塞;您不仅需要拨打电话并让它不阻塞,您还必须拨打另一个电话以确定该调用的结果将传递到您可以关联它的地方。

这个问题的答案是一个很长的 python 程序和很多关于不同操作系统接口以及它们如何工作的解释,但幸运的是我已经在另一个网站上写了这个答案;我称之为Twisted。如果 Twisted reactor 已经支持您的特定任务,那么您很幸运。否则,只要您的任务映射到一些现有的操作系​​统概念,您就可以扩展一个反应器来支持它。实际上,这些机制中只有 2 种:每个明智的操作系统上的文件描述符,以及 Windows 上的 I/O 完成端口。

在另一种情况下,如果你的函数消耗了大量 CPU,因此没有返回,它们并不是真正的阻塞;您的过程仍在进行中并完成工作。有三种方法可以处理它:

  • 单独的线程
  • 单独的进程
  • 如果您有一个事件循环,请编写一个定期产生的任务,通过以这样的方式编写任务,使其完成一些工作,然后要求事件循环在不久的将来恢复它以允许其他任务运行。

在 Twisted 中,这最后一种技术可以通过多种方式实现,但这里有一个语法上方便的技巧,可以让它变得简单:

from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue

@inlineCallbacks
def slowButSteady():
    result = SomeResult()
    for something in somethingElse:
        result.workHardForAMoment(something)
        yield deferLater(reactor, 0, lambda : None)
    returnValue(result)
于 2011-02-12T03:40:33.770 回答
12

您可以使用生成器进行协作多任务处理,但您必须编写自己的主循环来在它们之间传递控制。

这是使用上面示例的(非常简单的)示例:

def blocking1():
    while True:
        yield 'first blocking function example'

def blocking2():
    while True:
        yield 'second blocking function example'


tasks = [blocking1(), blocking2()]

# Repeat until all tasks have stopped
while tasks:
    # Iterate through all current tasks. Use
    # tasks[:] to copy the list because we
    # might mutate it.
    for t in tasks[:]:
        try:
            print t.next()
        except StopIteration:
            # If the generator stops, remove it from the task list
            tasks.remove(t)

您可以通过允许生成器产生新的生成器来进一步改进它,然后可以将其添加到任务中,但希望这个简化的示例能够给出一般的想法。

于 2011-02-11T07:06:57.070 回答
2

扭曲的框架不仅仅是套接字。它具有适用于许多场景的异步适配器,包括与子流程交互。我建议仔细看看。它做你想做的事。

于 2011-02-11T07:41:57.450 回答
1

如果您不想使用完整的操作系统线程,您可以尝试Stackless,它是 Python 的变体,增加了许多有趣的功能,包括“微线程”。有许多很好的例子,你会发现它们很有帮助。

于 2011-02-11T06:17:01.257 回答
0

您的代码没有阻塞。blocking1() 并且它的兄弟立即返回迭代器(不阻塞),并且单个迭代块也不会(在您的情况下)。

如果你想一个接一个地从两个迭代器中“吃掉”,不要让你的程序尝试完全吃掉“blocking1()”,然后再继续......

for b1, b2 in zip(blocking1(), blocking2()):
    print 'this will be shown', b1, 'and this, too', b2
于 2013-11-10T03:50:11.470 回答