问题标签 [pika]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - Python 如何检测到我的 RabbitMQ 密码失败?
我正在尝试通过Pika 库编写一个使用 RabbitMQ 的 Python 应用程序。我正在使用最新版本,0.9.5。我的问题是我的 Python 代码无法检测到它的 RabbitMQ 用户名和密码何时不正确,因为我不知道如何注册一个通知我错误的 Pika 回调。我的代码正在建立这样的连接:
如果我运行此脚本,则ioloop
运行时间超过三秒钟,然后程序终止。我无法弄清楚如何注册回调,甚至无法简单地检查死连接的状态,以确定发生错误或具体找出这是一个错误的密码错误。我是否错过了 Pika 文档中的重要内容?
有人可以帮我理解 Pika 0.9.5 错误处理应该如何工作吗?或者 0.9.5 是某种可怕的实验,我应该通过使用早期版本的 Pika 来避免,现在我注意到这封电子邮件似乎表明这条新的开发路线已成为死胡同?
谢谢你的帮助!
python - AMQP 中的消息路由
我想用 AMQP 做一些路由魔术。我的设置是 Python 与 Pika 在消费者/生产者端和 RabbitMQ 用于 AMQP 服务器。
我想达到的目标:
- 向单个交换机发送消息
- (在这里插入魔法)
像这样消费消息:
- 一组订阅者应该能够根据路由键进行检索
一组订阅者应该只收到所有消息。
棘手的部分是,如果第二组中的任何服务器已收到消息,则第二组中的其他服务器将不会收到该消息。第一组中的所有服务器应该仍然能够使用此消息。
这是否可以通过单个basic_publish
呼叫实现,或者我是否需要将消息发送到路由交换(对于第一组消费者)并发送到第二组消费者的“全局”交换?
澄清:
我想要实现的是一次调用来发布一条消息并让它被 2 组不同的消费者接收。
案例1:只接收基于路由键的消息(即带有路由键的消息
foo
将被当前对该主题感兴趣的所有消费者接收)案例 2:这基本上类似于RabbitMQ Tutorial for Worker Queues。有许多工人将接收以循环方式发送的消息。只有一个工人会收到一条消息
尽管如此,对某个路由密钥感兴趣的消费者收到的消息应该与工作人员收到的消息完全相同,由单个 API 调用产生。
(希望我的问题有意义我不太熟悉 AMQP 术语)
python - 我可以在守护进程中运行 pika 的 connection.ioloop.start() 吗?
我正在尝试设置一个工作守护程序来处理来自 rabbitmq 的消息。我正在使用 pika 及其 SelectConnection。如果我不将其作为守护程序运行,则该代码可以正常工作。我可以用
和
成功地。但是,当我添加
到worker.py,代码虽然没有引发任何异常,但会停止从队列中获取消息并最大化我的CPU利用率。worker.py 看起来和这个例子一模一样。
谢谢。
rabbitmq - 如何从我的连接自己的通道以外的其他通道恢复未确认的 AMQP 消息?
似乎我让 rabbitmq 服务器运行的时间越长,未确认的消息就越麻烦。我很想重新排队。实际上,似乎有一个 amqp 命令可以执行此操作,但它仅适用于您的连接正在使用的通道。我构建了一个小鼠兔脚本至少可以尝试一下,但是我要么遗漏了一些东西,要么无法以这种方式完成(rabbitmqctl 怎么样?)
python - 使用 Python、Pika 和 AMQP 设计异步 RPC 应用程序的最佳模式是什么?
我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行。它通过 RabbitMQ 消息代理以 JSON 格式发送订阅。
我尝试了几种策略,到目前为止最好的是以下策略,但仍未完全奏效:
每台集群机器运行一个消费者模块,它自己订阅 AMQP 队列并发出一个prefetch_count来告诉代理它一次可以运行多少个任务。
我能够使用 Pika AMQP 库中的 SelectConnection 使其工作。消费者和生产者都启动两个通道,一个连接到每个队列。生产者在通道 [A] 上发送请求并在通道 [B] 中等待响应,消费者在通道 [A] 上等待请求并在通道 [B] 上发送响应。然而,似乎当消费者运行计算响应的回调时,它会阻塞,所以我每次只在每个消费者处执行一个任务。
我到底需要什么:
- 消费者 [A] 将他的任务(每次大约 5k)订阅到集群
- 代理为每个消费者分派 N 条消息/请求,其中 N 是它可以处理的并发任务数
- 当单个任务完成时,消费者用结果回复代理/生产者
- 生产者收到回复,更新计算状态,最后打印一些报告
限制:
- 如果另一个用户提交工作,他的所有任务都将排在前一个用户之后(我想这从队列系统中自动出现,但我没有考虑对线程环境的影响)
- 任务有提交顺序,但回复顺序不重要
更新
我进行了更深入的研究,我的实际问题似乎是我使用一个简单的函数作为 pika 的 SelectConnection.channel.basic_consume() 函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是常规函数,这样回调就不会阻塞,消费者可以继续收听。
python - 如何在 RabbitMQ/pika 中实现优先级队列
我正在寻找使用 RabbitMQ 实现优先级队列。邮件列表建议使用多个队列,每个队列代表不同的优先级。
我的问题是,您如何使用 pika(或可能其他一些 python 库)以某种优先顺序轮询多个队列?
python - 为 Pika ioloop 异步设置超时 (RabbitMQ)
我需要能够优雅地停止在 Pika ioloop 中工作的消费者(工人)。工人应在 60 秒后停止。当前处理的消息应该完成。
我试图connection.close()
在回调函数内部放置一个,但这只会停止当前线程而不是完整的 ioloop。它给出了一个可怕的错误输出。
请参阅我的代码中的第 16 行和以下内容:我使用了(关于 Pika ioloop http://pika.github.com/connecting.html#cps-example的基本示例:
python - RabbitMQ:清除队列
我有一些队列,等等:
目前,我需要刷新此队列中的所有内容。但是,此时,另一个进程可能会发布到该队列。如果我使用 channel.queue_purge(queue='online'),当 queue_purge 仍在工作时,发布的消息会发生什么?
python - rabbitmq什么时候使用tcp背压?
根据Pika 文档,“如果客户端传递消息过快,RabbitMQ 代理会使用 TCP 背压来减慢客户端的速度。” 我已经注册了一个背压回调,它还没有被调用。我的队列有超过 4000 万条消息,而且还在增长。通过将背压乘数设置为 -1,我可以在每次发布消息时调用回调,但这仅对调试有用。
python - Rabbitmq pika 自动重连
我有一些脚本,使用 pika.SelectConnection 与 RabbitMq 服务器通信。
有没有办法让那些脚本尝试自动重新连接到 rmq 服务器,以防服务器出现故障,而脚本工作?