问题标签 [python-pika]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rabbitmq - Pika - 开始消费时如何处理 NoneTypes 异常
我正在尝试channel.consume
使用 Pika 0.11.2 从 RabbitMQ 服务器获取消息时进行验证
通话时get_mq
,我检测到:队列中的消息为空时发生异常。那么,我怎样才能验证异常
这是我的代码:
错误信息:
“Nonetype”对象不可迭代
multithreading - 如何关闭 pika 消费线程
对 Pika 来说有点新,我有一些代码,比如:(这里不包括构造函数和其他不相关的函数)
这是在一个单独的消费线程中。但是当我从线程调用关闭消费者时,有时它会在断开连接函数中遇到异常,说连接已经关闭,然后我会在关闭时看到打印;有时它只是在运行中打印并退出运行,并且从未在关机时看到打印,这对我来说似乎是不完整的关机。
所以有人知道原因吗?关闭鼠兔消费者线程的正确方法应该是什么?顺便说一句,我正在使用 pika blockingConnection
谢谢
python-3.x - 如何使用 Pika 和 Python 检查 RabbitMQ 中是否没有消息
我使用 pika python 库从 RabbitMQ 读取消息。在循环中读取消息由
这工作正常。但我也需要阅读一条消息,我这样做:
但是,当队列中没有消息时,脚本就会出现问题。如何实现此处描述的 get_empty() 方法?
python-3.x - 无法收听 RabbitMQ 消息
在我的项目中,我通过 RabbitMQ 访问 API 服务并期待消息。
我正在使用 Python 的行为框架进行测试。
我正在连接一个 RabbitMQ。创建队列,将队列与交换绑定。
在开始执行测试之前,我正在调用 create_connection 方法,该方法将建立连接、声明交换和队列。
然后我点击 API 服务并期待通过 RabbitMQ 收到一条消息,我在其中调用消费函数来消费消息。但我没有收到任何消息。任何帮助将不胜感激。
python - RabbitMQ pika.exceptions.ConnectionClosed (-1, "error(104, 'Connection reset by peer')")
我在 RabbitMQ 中有一个任务队列,其中有多个生产者 (12) 和一个用于 webapp 中繁重任务的消费者。当我运行消费者时,它开始将一些消息出列,然后崩溃并出现此错误:
生产者代码是:
唯一的消费者代码(一个是冲突的):
如您所见,消息有 3 个昂贵的函数。一项裁剪任务、一项 API 调用和一项数据库更新。没有 API 调用,que consumer 运行流畅。
提前致谢
python-3.x - RabbitMQ Pika 连接重置 , (-1, ConnectionResetError(104, 'Connection reset by peer'))
通过stackoverflow搜索并发布此问题,因为没有解决方案对我有用,而且我的问题可能与其他问题不同。
我正在编写一个脚本,它从rabbitMQ队列中获取一篇文章并处理文章以计算单词并从中提取关键词并将其转储到数据库中。我的脚本工作正常,但在执行一段时间后,我得到了这个异常
(-1, "ConnectionResetError(104, 'Connection reset by peer')")
我不知道我为什么会得到这个。我已经尝试了很多在 stackover flow 上可用的解决方案,没有一个对我有用。我已经编写了我的脚本并以两种不同的方式进行了尝试。两者都可以正常工作,但一段时间后会发生相同的异常。
这是我的第一个代码:
这是我的第二个代码:
在我的第一个代码中,我使用了线程,因为我想加快处理文章的过程。
这是我的回调函数
def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)
编辑:完整代码
pika 没有花很多时间来处理消息,但我仍然面临连接重置问题。
**处理消息所用的总时间:0.0005991458892822266 **
python-3.x - 使用 pika 连接到 rabbitmq 容器的容器的 HTTP 代码 500 api 错误
我正在使用 docker-compose.yml 管理 5 个 docker 容器。
容器功能是:
- 消息生产者 - 消息生产者代码封装在一个类中,并在 45 个线程中创建 45 个实例
- rabbitmq 容器 - 这是 rabbitmq:latest 图像,用于对产生的消息进行排队
消息接收器 - 消息接收器代码封装在一个类中,并在 90 个线程中创建 90 个实例。回调函数处理rabbitmq容器中的消息并将结果写入数据库容器。回调包括 basic_ack:
ch.basic_ack(delivery_tag=method.delivery_tag)
数据库容器 - 这是 Centurylink/mysql:latest 镜像
- Django 站点 - 连接到数据库容器并用于查询数据库的内容
在生产者和接收者中使用相同的参数以相同的方式创建到 rabbitmq 容器的连接:
最初,如果您刷新 Django 站点,该过程可以正常工作,并且记录会按预期明显更新。在几分钟后的某个时刻,docker 减速并最终停止并刷新 django 页面需要 20 多秒。消息生产者最终退出并出现错误:
message_producer_1 | 被杀
this_project_message_producer_1 的意外 API 错误(HTTP 代码 500)响应正文:连接尝试失败,因为连接方在一段时间后没有正确响应,或者连接失败,因为连接的主机没有响应。
我不确定从哪里开始寻找/解决这里的问题。一切都开始工作然后中断的事实让我认为这可能是由于消息队列大小或容器可用的资源,但我不知道如何测试这一点。
有什么帮助吗?
python - rabbitmq 是如何同步消息的?
我使用这个简单的代码pika
:
问题:
一旦我收到一条消息,并且我的on_message
回调正在运行,rabbitmq 如何确保在我调用之前不向其他工作人员发送相同的消息basic_ack
?basic_ack
因为在我的测试中,如果我多次运行代码,如果我不调用该消息将再次发送。从发送消息的那一刻起,rabbitmq 是否有一些超时,直到它得到一个ack
?
python - pika, rabbitmq - 从队列中获取所有消息而不消耗它们
使用pika
客户端,我想显示当前队列中的所有消息,而不使用它们。只是想知道队列有多忙并显示作业。
到目前为止,我只能在一条消息到达时阅读它:
我如何阅读整个队列?