1

我认为需要大量内存并且是异步的。我可以限制同时在处理程序函数中工作的连接数吗(比如内部有 N 个最大工作人员的关键部分)。

这在龙卷风中可能吗?

喜欢:

@tornado.web.asynchronous
def get(self):
    with critical_section(count=5):
    # some code

谢谢

4

2 回答 2

3

Toro提供的同步原语类似于 Tornado 协程的线程模块中的同步原语。您可以使用它的 BoundedSemaphore 来控制处理程序主体的入口:

# global semaphore
sem = toro.BoundedSemaphore(5)

@gen.coroutine
def get(self):
    with (yield sem.acquire()):
        # do work
于 2013-10-02T15:18:57.267 回答
1

简短的回答:

据我了解 Tornado 和其他使用基于 Future/Deferred/generator 并发的框架,这是不可能的。然而,使用高阶函数绝对是可能的,即一个critical_section()辅助函数,它将with-block 的主体作为参数。

长答案:

据我所知,Tornado 的并发性与 Twisted 的工作方式非常相似。这意味着非阻塞调用仅限于使用Futures 和yield(基于 Twisted@inlineCallbacks或 Tornado 中的等价物)。

为了实现critical_section上下文管理器,它必须在内部与反应器合作;这只能使用回调或yield. 但是,两者都不能与上下文管理器组合。

我实际上已经抛出了一些代码,直到我记得这一点。这是我想出的代码:

import sys
from contextlib import contextmanager
from collections import defaultdict

from tornado.concurrent import Future


_critical_sections = defaultdict(lambda: (0, []))


@contextmanager
def critical_section(count):
    # get the code location of the critical section
    frame = sys._getframe()
    orig_caller = frame.f_back.f_back
    lineno = orig_caller.f_lineno
    filename = orig_caller.f_code.co_filename
    loc = (filename, lineno)

    count, waiters = _critical_sections[loc]
    if count > 5:
        future = Future()
        _critical_sections[loc] = (count + 1, waiters + [future])
        # XXX: not possible; either have to set a callback or use yield, but
        # then this context manager itself would not work as you'd expect:
        future.wait()  # <---- not possible in Tornado nor Twisted; only in Gevent/Eventlet
        fn(*args, **kwargs)
    else:
        _critical_sections[loc] = (count + 1, waiters)
        try:
            yield
        finally:
            count, waiters = _critical_sections[loc]
            _, w_future = waiters[0]
            _critical_sections[loc] = (count, waiters[1:])
            w_future.set_result(None)

(无论如何我都没有测试过它,而且它无论如何都不能在 Tornado 上运行。)

现在,如果您对建议的方法感到满意,这里有一些可以帮助您入门的东西(或者它甚至可以开箱即用):

...

def _critical_section(count, fn, *args, **kwargs):
    ...
    if count > 5:
        future = Future()
        future.add_done_callback(lambda _: fn(*args, **kwargs))
        _critical_sections[loc] = (count + 1, waiters + [future])
        # XXX: not possible; either have to set a callback or use yield, but
        # then this context manager itself would not work as you'd expect:
        return future
    else:
        _critical_sections[loc] = (count + 1, waiters)
        try:
            return fn()
        finally:
            ... # same

那么你可以把它变成一个装饰器:

from functools import wraps

def critical_section(count):
    def decorate(fn):
        @wraps(fn)
        def ret(*args, **kwargs):
            return _critical_section(count, fn, *args, **kwargs)
        return ret
    return decorate

用法:

@tornado.web.asynchronous
def get(self):
    @critical_section(count=5)
    def some_code():
        pass  # do stuff

此外,代码正在使用sys._getframe(),它(至少)有两个含义:

  • 它会使代码在 PyPy 上运行时变慢(直到/除非 PyPy 能够 JIT 编译使用 的函数sys._getframe),但大多数情况下,当涉及到 Web 代码时,这是一个可以接受的权衡
  • 我认为如果您将其编译.pyc并删除该代码将无法工作.py-它不应该能够确定调用代码的文件名和行号,因此(可能)无法唯一区分关键部分的位置,在这种情况下,您必须使用锁定对象。

注意:上下文管理器版本在 Gevent ( http://gevent.org ) 或 Eventlet 上是完全可行的。

于 2013-10-02T11:14:09.020 回答