问题标签 [python-pika]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2221 浏览

python-3.x - Python - 在单独线程中运行的函数之间传递函数(回调)变量

我正在尝试开发一个使用 pika 和线程模块的 Python 3.6 脚本。

我有一个问题,我认为这是由于我的 A)对 Python 和一般编码非常陌生,以及 B)我不明白如何在函数在单独的线程中运行并且已经在括号中传递参数时如何在函数之间传递变量在接收函数名称的末尾。

我之所以这样认为,是因为当我不使用线程时,我可以简单地通过调用接收函数名称并提供要传递的变量来在函数之间传递一个变量,在括号中,一个基本示例如下所示:

这在运行时打印:

我需要使用线程的代码的工作版本如下所示 - 它使用直接函数(无线程),我使用 pika 通过 pika 回调函数从(RabbitMQ)队列接收消息,然后我通过在“回调”函数中收到的消息正文到“处理函数”:

这很好用,但是我想将其翻译为在使用线程的脚本中运行。当我这样做时,我必须将参数“channel”提供给在其自己的线程中运行的函数名称 - 然后尝试包含“body”参数,以便“processing_function”看起来如下所示:

我收到一条错误消息:

我知道在使用线程时需要更多代码,并且我在下面包含了用于线程的实际代码,以便您可以看到我在做什么:

这在运行时会产生错误

我不明白为什么会这样或如何解决它。

感谢您抽出宝贵时间阅读此问题,非常感谢您提供的任何帮助或建议。

请记住,我是 python 和编码的新手,所以可能需要拼写出来,而不是能够理解更多神秘的回复。

谢谢!

0 投票
1 回答
568 浏览

python - RabbitMQ - 套接字关闭异常 - Windows Server 2012

所以我有一个发布者,它使用schedule python 包每隔 5-10 分钟从文件中读取数据,并将每一行发布到队列中。另一方面,我有消费者使用类似的东西:

分配任务功能如下所示:

由于某种原因,一段时间后它会引发以下错误:

本质上,发布者和消费者都是 2 个不同的 Python 程序,旨在在装有 Windows Server 2012 的单台机器上运行。社区可以帮助了解这里可能出现的问题。
相同的代码在我的 Windows 机器上本地运行得非常好

以下是我的日志文件的输出。
=ERROR REPORT==== 3-Aug-2017::15:06:48 === closing AMQP connection <0.617.0> ([::1]:53485 -> [::1]:5672): missed heartbeats from client, timeout: 60s

0 投票
1 回答
1738 浏览

python-3.x - Python 3.6:如何在不使用类的情况下使用 SelectConnection 适配器创建 pika 消费者?

我是 python 和 pika 的新手,我遇到了使用 BlockingConnection 适配器从队列中消费的问题,该适配器在几个小时后不断抛出异常。

因此,我现在尝试使用 SelectConnection(异步)适配器,但我只能找到在类中使用此适配器类型的示例,并且使用基于类的代码目前有点超出我的理解。

我确实找到了一个示例,该示例显示了如何使用 SelectConnection 创建生产者,但我找不到一个让我恼火的消费者示例,因为我原以为 pika 网站会详细介绍基本的生产者和消费者,而不仅仅是一个制作人...

生产者代码如下,取自 pika 网站(为什么他们没有包含基本消费者的示例超出了我的范围......):(http://pika.readthedocs.io/en/latest/examples/comparing_publishing_sync_async .html )

谁能建议我如何将这段代码修改为“消费”而不是“生产”,或者你能指出任何只使用基本函数而不是基于类的例子的例子,我发现了很多例子但没有用出于我的特殊目的...

谢谢你。(正如您可能从我的问题的语气中了解到的那样,我现在凌晨 4 点有点压力,我一直在努力解决这个问题好几个小时!)

0 投票
2 回答
671 浏览

python - 试图在没有确认的情况下获取兔子味精,但没有成功

我有一个任务,我尝试在兔子队列中获取所有消息。我只需要获取,而不是消费。所以这是代码,我正在使用

当我运行此代码时,它第一次正常工作。我得到队列中的所有消息,并且所有消息都保持在“就绪”状态,但是当我第二次运行循环时,所有消息都变为“未确认”状态。

要求:每次我应该只获取消息,并且它们不应该不被确认。

第一个循环:

第一个循环

第二循环:

在此处输入图像描述

谁能帮助我,我做错了什么,或者我应该做些什么改变。

提前致谢 :)

编辑1:至于@BarrensZeppelin 的回答,如果我设置no_ack=True,所有的味精都会丢失。检查以下屏幕截图: 在此处输入图像描述

0 投票
2 回答
1096 浏览

python-3.x - Python在RabbitMQ中的函数之外获取变量值

下面是用于从 producer.py 获取消息的函数

这会打印出预期的结果,但我正在寻找的是在回调函数之外获取变量 json_resp 以进行进一步处理

0 投票
1 回答
1008 浏览

rabbitmq - Pika RabbitMQ 客户端服务可以消费和发布消息吗?

有 Pika 经验的人能否就以下功能是否可行,或者我的想法是否表明对 Pika 缺乏概念性理解,给我一个快速的是/否的回应。

我想要的功能:

Python 服务(单线程脚本)使用 SelectConnection 适配器与我的 RabbitMQ 代理建立了一个连接。

该连接有两个通道。

使用一个通道 A,该服务声明一个队列并绑定到某个交换 E1。另一个通道 B 用于声明其他交换 E2。

该服务通过 A 使用队列中的消息。它对这些消息进行一些小处理,[可能通过其与 MongoDB 实例的连接执行 CRUD 操作],然后通过 B 发布消息以交换 E2。

我已经彻底阅读了 Pika 文档,但没有找到足够的信息来了解这是否可行。

简而言之-单个python脚本可以通过一个选择连接适配器连接发布和消费吗?

0 投票
1 回答
3215 浏览

rabbitmq - 使用 pika(python 客户端)通过 RabbitMQ 发布/订阅 MQTT 消息

我有现有的 RabittMQ 服务器设置,我们启用了 MQTT 插件来发布/订阅 mqtt 消息。我们有 pika 客户端来处理现有的队列消息。现在,我们想使用相同的 pika on_message() 处理程序来处理 mqtt 消息。我能够通过 eclipse paho 客户端发布和订阅 mqtt 消息。我们想使用现有的 RabittMQ 客户端(pika)。MQTT 插件默认发布到 amq.topic exchange 。我想将相同的消息发布到我自己的交易所。请告诉我,如何得到这个。

0 投票
0 回答
148 浏览

rabbitmq - RabbitMQ 和 django celery

我正在使用 pika 客户端为某些 pub-sub 项目创建到 rabbitmq 的连接。直接调用客户端可以正常工作,但是当我尝试在 celery 任务中创建通道时出现错误。

意外引发:OSError(9, 'Bad file descriptor')

这是创建连接的代码

我在 connection.channel() 处遇到错误。

我如何在 celery 中创建连接(经纪人也是 rabbitmq)。

0 投票
1 回答
932 浏览

python - Pika 线程执行出错 - 505, 'UNEXPECTED_FRAME

我知道 pika 不是线程安全的,我试图使用锁来访问通道,但仍然出现错误:

PS我不能使用不同的频道。

我能做什么?提前感谢您的帮助

0 投票
1 回答
944 浏览

rabbitmq - 为什么从队列消费时需要循环调用?

我正在尝试使用 pika 和扭曲从兔子队列中消费:

  1. 不断(新消息 -> 消费)
  2. 一次(新消息 -> 消费一次,在我说之前不要再次消费)

我唯一的输入是这个例子。它涵盖了用例 1。用例 2 呢?

不是basic_consume以在新消息准备好时“通知”的方式实现的吗?为什么开始循环呼叫是我的“工作”?轮询似乎与扭曲的事件循环模式相反,不是吗?

当使用 twisted 发出 http 请求时,它会发送请求并在返回后继续执行(通过延迟)。为什么这在 RabbitMQ/pika 中不起作用?

这是我期望它的工作方式:

  1. 订阅队列
  2. 从队列消费(除非队列中有消息,否则不会立即触发,每次队列中有新消息时都会触发,我现在忽略qos。)

我如何使用一条消息,例如basic_get?我是否需要开始循环调用,一旦收到消息,就停止它?