我正在尝试从我的 Falcon API 方法运行一些戏剧演员,如下所示:
def on_post(self, req, resp):
begin_id = int(req.params["begin_id"])
count = int(req.params["count"])
for page_id in range(begin_id, begin_id + count):
process_vk_page.send(f"https://vk.com/id{page_id}")
resp.status = falcon.HTTP_200
我的代码进入“发送”方法,通过循环没有任何问题。但是队列中没有新任务!Actor 本身没有被调用,并且我的代理中的“默认”队列是空的。如果我设置自定义队列,它仍然是空的。我的演员长这样:
@dramatiq.actor(broker=broker)
def process_vk_page(link: str):
pass
经纪人在哪里
broker = RabbitmqBroker(url="amqp://guest:guest@rabbitmq:5672")
RabbitMQ 日志告诉它连接正常
我在调试器中做了一些额外的研究。它可以很好地获取消息(这意味着要发送到代理),并且Actor.send_with_options () 中的 broker.enqueue 不返回任何异常,尽管我无法真正了解它的内部逻辑。我真的不知道它为什么会失败,但肯定是RabbitmqBroker.enqueue()导致了问题。
Broker 是 Erlang 22.2.1 上的 RabbitMQ 3.8.2,使用默认设置从rabbitmq Docker Hub 映像在 Docker 中运行。Dramatiq 版本是 1.7.0。
在 RabbitMQ 日志中,只有在应用程序启动时连接到代理,而在我关闭它时断开连接,如下所示:
2020-01-05 08:25:35.622 [info] <0.594.0> accepting AMQP connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672)
2020-01-05 08:25:35.627 [info] <0.594.0> connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
2020-01-05 08:28:35.625 [error] <0.597.0> closing AMQP connection <0.597.0> (172.20.0.1:51246 -> 172.20.0.3:5672):
missed heartbeats from client, timeout: 60s
Broker 在__init__.py
主包中定义,在子包中导入。我不确定在所有函数的装饰器中指定相同的代理实例是否可以,但是文档中没有任何禁止它的地方。我想这没关系,因为如果我为每个 Actor 创建新的代理,它仍然不起作用。
我试图将 Redis 设置为代理,但我仍然遇到同样的问题。
这可能是什么原因?