3

我正在将我从 Golang(使用 redis)编写的应用程序移植到 Python,我很想使用 Celery 来完成我的任务队列,但是我有一个关于路由的问题......

我的应用程序通过 REST POST 接收“事件”,其中每个“事件”可以是不同的类型。然后我想让后台工作人员等待某些类型的事件。这里需要注意的是,一个事件可能导致处理该事件的任务不止一个。例如:

一些/lib/a/tasks.py

@task
def handle_event_typeA(a,b,c):
    # handles event...
    pass

@task
def handle_event_typeB(a,b,c):
    # handles other event...
    pass

一些/lib/b/tasks.py

@task
def handle_event_typeA(a,b,c):
    # handles event slightly differently... but still same event...
    pass

总而言之......我希望能够运行 N 数量的工人(跨 X 台机器)并且这些作品中的每一个都将注册 Y 数量的任务,例如:a.handle_event_typeA、b.handle_event_typeA 等。 . 并且我希望能够将任务插入队列并让一名工作人员拿起任务并将其路由到工作人员中的多个任务(即到 a.handle_event_typeA 和 b.handle_event_typeA)。

我在这里阅读了 Kombu 的文档和这里的 Celery 的路由文档但我似乎无法弄清楚如何正确配置它。

一段时间以来,我一直在使用 Celery 进行更传统的工作流程,我对它的功能集、性能和稳定性感到非常满意。我会直接使用 Kombu 或一些自制解决方案来实现我需要的东西,但如果可能的话,我想使用 Celery。

多谢你们!我希望我不会在这个问题上浪费任何人的时间。

编辑 1

在考虑了这个问题一段时间后,我想出了一个解决方法来实现我想要的 Celery。这不是优雅的解决方案,但效果很好。我正在使用 django,它是缓存抽象(您可以直接使用 memcached 或 redis 之类的东西)。这是我想出的片段:

from django.core.cache import cache
from celery.execute import send_task

SUBSCRIBERS_KEY = 'task_subscribers.{0}'

def subscribe_task(key, task):
    # get current list of subscribers
    cache_key = SUBSCRIBERS_KEY.format(key)
    subscribers = cache.get(cache_key) or []
    # get task name
    if hasattr(task, 'delay'):
        name = task.name
    else:
        name = task
    # add to list
    if not name in subscribers:
        subscribers.append(name)
    # set cache
    cache.set(cache_key, subscribers)

def publish_task(key, *kargs):
    # get current list of subscribers
    cache_key = SUBSCRIBERS_KEY.format(key)
    subscribers = cache.get(cache_key) or []
    # iterate through all subscribers and execute task
    for task in subscribers:
        # send celery task
        send_task(task, args=kargs, kwargs={})

然后我要做的是通过执行以下操作来订阅不同模块中的任务:

subscribe_task('typeA', 'some.lib.b.tasks.handle_event_typeA')

然后我可以在处理 REST 事件时调用发布任务方法。

4

0 回答 0