问题标签 [python-pika]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
170 浏览

rabbitmq - rabbitmq pika don't take more than one job when using a thread

I have a problem of ConnectionResetError when I block the IOLoop that channel.start_consuming() is running for a long time. So I've read this code:
https://github.com/pika/pika/blob/0.12.0/examples/basic_consumer_threaded.py
In this code the job is running in a background thread.

The problem is, when my job is running in a thread, The worker can still take more jobs, (i.e, keep getting on_message callbacks). But I don't want my worker to process more than one job at a time.
What should I do? Is it possible to inform the queue that the worker is 'busy' and can't accept jobs for some time?

0 投票
1 回答
1382 浏览

python - 使用 python 从 Windows 商店访问特定证书

我正在尝试使用 Pika 连接到 RabbitMQ。我们正在使用证书 (ssl) 来执行此操作。这是他们(Pika)的示例:

这很好,如果我们的证书文件有一个文件路径,但是我们在 Windows 上并且它们存储在 Windows 商店中。所以我不相信上面提供的 load_cert_chain() 会起作用。

我可以像这样访问(或查看)特定证书:

但这会得到一个证书列表。我没有看到任何明显的方法来搜索并获取我需要的证书。即使可以,我也不确定如何将代码连接到“pika.SSLOptions(context,...)”

所以这里有两个问题,但更重要的是:

  1. 如何从 Windows 商店中提取特定证书(因为我没有文件路径)?

(另一个问题是如何将其连接到 Pika,但如果上述问题得到回答,我可能会弄清楚)

注意:Pika 只是一个与 RabbitMQ 接口的第三方库。 注2:使用Python3.5

0 投票
0 回答
225 浏览

python-3.x - channel.start_sumption() 没有运行并且没有提供任何异常

我是 python 新手。我无法理解channel.start_consuming()不工作。我的代码

基本处理程序定义

当我跑步时,我得到了。我在我的代码中提到了一些打印语句

它没有在“channel.start_sumption()”行之后打印 2 - print("2") 它不会进入 basic_consume 中提到的定义“base_handler”。

我没有得到任何例外

0 投票
1 回答
1815 浏览

python - 来自 BlockingConnection 的 connection_workflow.AMQPConnectorStackTimeout

pika BlockingConnection 尝试失败(pika 版本 1.0.0)。应用程序日志中显示的异常 - adapters.utils.connection_workflow.AMQPConnectorStackTimeout- 似乎不是pika.exceptions.

pika 是否不应该将诸如此类的内部异常转换为其中之一,pika.exceptions然后再将其呈现给调用者?

如果不是,我应该如何枚举 pika 可能引发的所有可能异常,以便我可以决定调用者可以处理哪些以及不应该尝试处理哪些?(本质上,调用者需要决定是否退出不可恢复的异常,或者如果条件可能是暂时的,则重试。)

也许我误解了应用程序日志,如下所示:

0 投票
1 回答
2417 浏览

python-3.x - Python Pika 与 TLSv1_2 的连接

我正在使用 Python3.6 连接到 RabbitMQ。此连接使用 TLSv1.2 协议。设置 SSL 的连接参数:

连接到 rabbitMq 时出现以下错误:

我已经为Connection ParametersTLS params example推荐了 pika 文档,但到目前为止还没有成功。

连接到同一个 Rabbit 主机的类似代码在 Java 中工作:

0 投票
1 回答
95 浏览

python - 将存在插件交换绑定到另一个交换不会产生任何消息

我将 RabbitMQ 3.5.1 与rabbit_presence_exchange(二进制分发)和rabbitmq_event_exchange(帮助调试此问题)插件和 Python Pika 客户端一起使用。

Presence 插件通过给你一个新的交换类型来工作:x-presence。使用路由键将队列绑定到此队列会在队列绑定和未绑定时生成存在通知(例如,路由键是用户名)。在没有路由键的情况下绑定队列会使您注册以接收状态通知。

这很好,我可以成功地生成和接收这样的存在通知。但是,现在我想通过交换路由存在消息。最初,我尝试使用标头交换,但我没有看到任何消息,所以我改为扇出交换(以防我设置的标头匹配不正确)但我仍然没有看到任何消息通过.

这是我在没有额外交换的情况下生成和接收存在消息的脚本(即这是一个有效的脚本):

我修改了上述内容以将存在消息路由到内置的扇出交换并将我的队列绑定到该交换:

我很困惑为什么交易所没有收到消息。Erlang 不是我的语言之一,所以我在尝试读取存在插件的源代码以确定是否支持它时遇到了麻烦(尽管我不明白为什么不支持)。

如果有人有任何想法(或使用 RabbitMQ 处理存在的更好方法),我很想听听。

编辑:
使用此代码和两个客户端运行,我的交换和绑定如下所示:

0 投票
1 回答
788 浏览

python - SelectConnection 没有连接到rabbitmq

我是 Python 编程的新手,所以我尝试使用 pika 使用两个 rabbitmq 队列,但是使用SelectConnection会引发异常IndexError: tuple index out of range An invalid channel number has been specified,但如果我使用BlockingConnection我能够成功使用队列。一些信息: 1 - 我正在使用 pika 网站异步示例 2 - 我的 RabbitMQ 正在使用来自 docker hub 官方图像的 docker 运行

这是我的代码:

我究竟做错了什么?

0 投票
1 回答
142 浏览

python - 发布者等待消费消息,而它应该做这两件事(发布和订阅来自订阅者的回复)。)

发布者在使用第一个 message_reply 后被阻止。它没有向订阅者发送任何消息。

我已经尝试过,如果我不使用 'start_sumption()' 方法,发布者会连续发送数据,但它不会打印来自订阅者的回复消息。如果我使用 'start_sumption()' 方法,它只会阻止发布者并等待。

我想设计这个-

请帮忙。

0 投票
1 回答
304 浏览

pika - 使用 pika.BlockingConnection 我想从队列中消费消息然后退出

我使用 pika.BlockingConnection 和 channel.start_sumption() 编写了一个消费者,它使用来自特定队列的消息,当消息从队列中耗尽时,消费者无限期地等待下一条消息。

有没有一种方法可以指定某种超时持续时间,如果消费者在特定时间段内没有从队列中获取消息,则 start_sumption() 将正常退出该持续时间。

我正在使用 python 3.7.4 和 pika 1.1.0 从 RabbitMQ 3.7.12 消费。

0 投票
0 回答
225 浏览

pika - 如果没有网络连接,pika 的 channel.basic_publish 是否应该提前超时?

我正在升级到 pika 1.1.0,并进行了一些健全性测试。我有:

  • 在下面放了一个断点
  • 断开网络
  • 跨过这个命令

...并且没有抛出异常。这是预期的吗?

连接是通过以下方式创建的:

更新:如果我channel.confirm_delivery()在此之前打电话,我成功地得到了 AMQPError。

但是,这不会在 60 秒内发生(这不是我的 ConnectionParameters)。我怎样才能让它更快地注意到连接丢失?