42

某些功能应该在 Web 服务器上异步运行。发送电子邮件或数据后处理是典型的用例。

编写装饰器函数以异步运行函数的最佳(或最 Pythonic)方法是什么?

我的设置很常见:Python、Django、Gunicorn 或 Waitress、AWS EC2 标准 Linux

例如,这是一个开始:

from threading import Thread

def postpone(function):
    def decorator(*args, **kwargs):
        t = Thread(target = function, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
    return decorator

期望的用法:

@postpone
def foo():
    pass #do stuff
4

4 回答 4

69

我继续在规模和生产中使用此实现,没有任何问题。

装饰器定义:

def start_new_thread(function):
    def decorator(*args, **kwargs):
        t = Thread(target = function, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()
    return decorator

示例用法:

@start_new_thread
def foo():
  #do stuff

随着时间的推移,堆栈不断更新和转换。

最初是 Python 2.4.7、Django 1.4、Gunicorn 0.17.2,现在是 Python 3.6、Django 2.1、Waitress 1.1。

如果您正在使用任何数据库事务,Django 将创建一个新连接,这需要手动关闭:

from django.db import connection

@postpone
def foo():
  #do stuff
  connection.close()
于 2015-03-07T09:11:28.677 回答
20

Celery是一个异步任务队列/作业队列。它有据可查,非常适合您的需要。我建议你从这里开始

于 2013-08-24T16:36:47.490 回答
3

在 Django 中进行异步处理最常见的方法是使用Celerydjango-celery.

于 2013-08-24T16:36:43.677 回答
2

如果没有太多的新工作,tomcounsell 的方法很有效。如果在短时间内运行许多持久的作业,因此产生大量线程,主进程将受到影响。在这种情况下,您可以使用带有协程的线程池,

# in my_utils.py

from concurrent.futures import ThreadPoolExecutor

MAX_THREADS = 10


def run_thread_pool():
    """
    Note that this is not a normal function, but a coroutine.
    All jobs are enqueued first before executed and there can be
    no more than 10 threads that run at any time point.
    """
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        while True:
            func, args, kwargs = yield
            executor.submit(func, *args, **kwargs)


pool_wrapper = run_thread_pool()

# Advance the coroutine to the first yield (priming)
next(pool_wrapper)
from my_utils import pool_wrapper

def job(*args, **kwargs):
    # do something

def handle(request):
    # make args and kwargs
    pool_wrapper.send((job, args, kwargs))
    # return a response
于 2019-01-30T02:12:18.863 回答