5

我正在尝试从我的 Falcon API 方法运行一些戏剧演员,如下所示:

def on_post(self, req, resp):
    begin_id = int(req.params["begin_id"])
    count = int(req.params["count"])

    for page_id in range(begin_id, begin_id + count):
        process_vk_page.send(f"https://vk.com/id{page_id}")

    resp.status = falcon.HTTP_200

我的代码进入“发送”方法,通过循环没有任何问题。但是队列中没有新任务!Actor 本身没有被调用,并且我的代理中的“默认”队列是空的。如果我设置自定义队列,它仍然是空的。我的演员长这样:

@dramatiq.actor(broker=broker)
def process_vk_page(link: str):
   pass

经纪人在哪里

broker = RabbitmqBroker(url="amqp://guest:guest@rabbitmq:5672")

RabbitMQ 日志告诉它连接正常

我在调试器中做了一些额外的研究。它可以很好地获取消息(这意味着要发送到代理),并且Actor.send_with_options () 中的 broker.enqueue 不返回任何异常,尽管我无法真正了解它的内部逻辑。我真的不知道它为什么会失败,但肯定是RabbitmqBroker.enqueue()导致了问题。

Broker 是 Erlang 22.2.1 上的 RabbitMQ 3.8.2,使用默认设置从rabbitmq Docker Hub 映像在 Docker 中运行。Dramatiq 版本是 1.7.0。

在 RabbitMQ 日志中,只有在应用程序启动时连接到代理,而在我关闭它时断开连接,如下所示:

2020-01-05 08:25:35.622 [info] <0.594.0> accepting AMQP connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672)
2020-01-05 08:25:35.627 [info] <0.594.0> connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
2020-01-05 08:28:35.625 [error] <0.597.0> closing AMQP connection <0.597.0> (172.20.0.1:51246 -> 172.20.0.3:5672):
missed heartbeats from client, timeout: 60s

Broker 在__init__.py主包中定义,在子包中导入。我不确定在所有函数的装饰器中指定相同的代理实例是否可以,但是文档中没有任何禁止它的地方。我想这没关系,因为如果我为每个 Actor 创建新的代理,它仍然不起作用。

我试图将 Redis 设置为代理,但我仍然遇到同样的问题。

这可能是什么原因?

4

1 回答 1

5

最有可能的问题是您没有告诉工作人员使用哪个代理,因为您没有声明默认代理。

你没有提到你的文件在你的应用程序中是如何布局的,但是,假设你的代理被定义为brokerinside tasks.py,那么你必须让你的工作人员知道它,如下所示:

dramatiq tasks:broker

dramatiq --help有关更多信息和模式,请参阅末尾的示例。

于 2020-01-06T07:27:25.970 回答