1

我正在尝试使用来自 Alvaro Videla的反向主题交换 rabbitmq 插件创建一个 celery 应用程序。工作人员似乎可以使用此交换很好地与经纪人联系,但是当我对我的任务进行主题反向路由时,不要选择“#”或“*”,就像直接交换一样。

这就是我的队列:

Queue(name='cluster', 
          exchange = Exchange(name='cluster', 
                              type='x-rtopic',
                              delivery_mode='persistent',
                              durable=True), 
          routing_key='intel.%d.%s' % (n_cores, hostname),
          durable = True,)

现在图片 2 个工作人员使用以下 routing_key

  • Worker1:intel.8.host1
  • Worker2:amd.2.host2

这就是我正在尝试的任务以及我所经历的路由键:

Routing key     | Works?   |  Result              | Expected
-------------------------------------------------------------------------
'intel'         | OK       | Nobody receives      | 
'intel.*'       | OK       | Nobody receives      |
'intel.#'       | WRONG    | Everyone receives    | just Worker1 receives
'#.host1'       | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.*      | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.host1  | WRONG    | Everyone receives    | just Worker1 receives
'*.2.*'         | WRONG    | Everyone receives    | just Worker2 receives
'intel.8.host1' | OK       | like direct exchange | 

为了确定问题出在哪里,我测试了使用 pika 和 kombu 进行简单消息传递的插件,并且两者都运行良好,完全符合预期。所以我认为 Celery 交换(路由)消息的方式一定有问题。也许我应该创建一个自定义路由类!?

提前致谢。

4

1 回答 1

0

经过一段时间的挖掘,我发现反向主题交换插件可以很好地与 Celery 配合使用。我误解了 Rabbitmq 队列的工作方式。为了完全使它工作,我必须定义一个我的路由器,其中任务被路由到包含这些队列的交换,并且只指定 routing_key 和交换名称,这样任务仍然会循环连接到该交换的节点和能够在任务路由键上使用通配符。

所以队列设置是这样的:

routed_queue = 'intel.8.pchost'

CELERY_QUEUES = (

    Queue(name='cluster.%s' % routed_queue,
          exchange = Exchange(name='cluster',
                              type='x-rtopic'), 
          routing_key=routed_queue),)

路由器将是这样的:

MyRouter 类(对象):

def route_for_task(self, task, args=[], kwargs={}):

    routing_key = kwargs['routing_key'] if kwargs.has_key('routing_key') and\
                  kwargs['routing_key'] else '#'

    return {'exchange': 'cluster',
            'exchange_type': 'rtopic',
            'routing_key': routing_key}

然后我会将 routing_key 作为任务的 kwargs 传递,能够在任务“intel.#”中设置,这意味着该任务将由任何具有以 intel 开头的队列的工作人员执行。

唯一的问题!这是由于某种原因,我不得不使用 .apply_async 而不是 .delay 来执行任务。

整个想法是能够根据集群中可用的机器规格相应地路由我的任务。一些任务应该只在英特尔处理器上运行,而另一些只能在 AMD 上运行,或者通过节点中的核心数量或使用主机名来定义。

希望这可以帮助任何尝试在未来做同样事情的人。

于 2014-10-29T01:45:18.400 回答