问题标签 [php-amqplib]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
php - 如何向特定的虚拟机 IP 发送消息
所以我将RabbitMq与PHP一起使用,我的服务器包含 RabbitMq 和两个 Vms(ubuntu 服务器),有关更多详细信息,这是我的代码;
在 RabbitmQ 服务器中:send.php:
这是第一个 Vm 的代码:recive.php:
注意:在我的 RobbitmQ 中,当我使用 localhost 时我没有收到错误,但是使用 localhost 我无法控制女巫 Vm 我将发送消息
php - RabbitMQ:从 DLX 绑定
我已经搜索了该信息(包括文档),但找不到。
我正在使用最新版本的php-amqplib和 RabbitMQ v. 2.7.1。我有三个队列和三个交换:
行为非常简单:消息发布到EXCHANGE_TO_PROCESS
. 外部工作人员处理消息:如果处理正常,则消息仅被确认,因此从队列中删除(这部分工作正常);如果处理出错,则将消息插入到EXCHANGE_WAITING
where,在 5 分钟的 TTL 后,通过 DLX 将其重新插入到EXCHANGE_TO_PROCESS
重新处理中。但是,在第三次失败之后,它会被插入到EXCHANGE_TO_CLEAN
cron 作业将出现的地方并清理消息、日志错误等。
然而,我遇到的问题是代码清楚QUEUE_WAITING
地将命令。5 分钟结束后,该消息随即消失。我不太清楚为什么。EXCHANGE_WAITING
QUEUE_TO_PROCESS
QUEUE_WAITING
所有这一切都让我们想到了我的问题:死信交换是否将参数中的交换隐式绑定到队列?并且:我丢失的消息可能会发生什么?
编辑
我比以前更困惑。我尝试了以下非常基本的代码:
这创建了两个队列和两个交换(我已经看到它们以fanout
避免打扰路由键),将 queueA 绑定到 exchangeA 并将 queueB 绑定到 exchangeB,在 queueA 上设置一个 TTL 并将其 DLX 到 exchangeB。观察管理页面中发生的情况,我看到一条消息在 queueA 中花费了 5 秒,正如预期的那样,然后消息消失了,就像我上面更复杂的代码一样。
php - 与 RabbitMQ 的主题交换歧义
我有点困惑。我正在尝试实施主题交换,但不确定需要什么。
我想要几个路由键和 1 个主题交换(默认 amq.topic)。我的钥匙是这样的:
- customer.appA.created
- customer.appB.created
- 客户.*.created
我希望我的队列是持久的,但我需要 1 个“客户”队列还是 appA 和 appB 的 2 个队列?我已经弄清楚了我的出版商;连接,交换声明,基本发布。
但我正在与消费者斗争。假设我要打开 3 个控制台,每个控制台对应上述路由键。
我当前的消费者有:连接、交换声明、队列绑定、基本消费。这些连接到一个持久的“客户”队列。但是,我的消息被轮询到每个控制台/消费者,而不是使用路由键。
所以我的问题;
- 对于典型的主题交换设置;你需要多少个队列?
- 我的消费者可以只使用交换绑定,还是必须包括队列交互?
- 一条消息是否可以通过主题交换出现在 2 个消费者中(或者您是否需要为此进行扇出)?
php - RabbitMQ PHP AMQP 库 - 获取消息头
我有一个简单的排队系统,很明显,它接收消息并发布它们。
但是,由于系统的新开发,我们现在需要检查x-death
交换中的标头,但是,我似乎没有任何有关如何通过 PHP AMQP 库检索该标头的文档。
有人对如何实现这一目标有任何想法吗?
php - 完成所有其他工作后重新使用已确认的消息
我正在实现 RabbitMQ 以在另一台服务器上执行一些图像编辑操作。但是,有时请求可能会在源图像同步到该服务器之前到达该服务器 - 在这种情况下,我想将消息弹出回队列并在所有其他操作完成后处理它。
但是,使用重新提交位设置调用 basic.nack 会使我的队列立即重新接收该消息 - 在任何可以实际完成的操作之前。
目前我觉得我被迫实现一些逻辑,只是将原始消息重新提交给交换,但我想避免这种情况。既是因为同一条消息可能已在另一台服务器上成功处理(使用它自己的队列),也是因为我希望这是一种常见的模式,必须有更好的方法。
(哦,我在消费者和服务器代码中都使用 php-amqplib)
谢谢!
更新:按照 zaq178miami 的建议,我使用死信交换解决了我的问题
我目前的解决方案:
$dead_letter_exchange
在原始队列上声明死信交换$worker
- 宣布恢复交换
$recovery_exchange
- 声明一个队列
$dead_queue
,ax-message-ttl
为 5 秒并x-dead-letter-exchange
设置为$recovery_exchange
- 绑定
$dead_letter_queue
到$dead_letter_exchange
- 并绑定
$worker
到$recovery_exchange
$dead_letter_exchange
并且$recovery_exchange
是生成的名称,基于我正在消费的交换和价值$worker
使每条被 nack 的消息在五秒钟后仅在该特定队列(服务器)上返回给工作人员进行重试。我可能仍想应用一些在$n
重试后将消息丢弃的逻辑。
我仍然对更好的想法持开放态度;-)
php-amqplib - php-amqplib 非阻塞示例引发警告
我正在尝试使用 videlalvaro/php-amqplib 库进行 AMQP 通信,但似乎无法让非阻塞版本工作。运行 demo/amqp_consumer_non_blocking.php 只是不断地显示以下错误消息:
使用 @ 符号抑制此消息可以消除错误,但我仍然无法让它显示任何内容。
我假设我正确地向它发送了一条消息:我正在使用 Web 界面,只是将一条消息直接发布到队列中,但就像我说的那样,我在我的非阻塞循环中没有得到任何东西。
使用阻塞演示 (demo/amqp_consumer.php) 执行相同的过程可以按预期工作。
有没有其他人遇到过这类问题,或者他们可以指导我找到一些可能的解决方案吗?谢谢。
php - RabbitMq: Replace duplicated messages
I'm using RabbitMq for submitting data for registered web hooks.
base info: If a contact is created in the system, the message is put in the queue and the consumer is sending later the hook data to the registered url.
To my question: Its possible, that a contact is updated in 5 seconds twice and that both messages are still in the queue. But I'd like, if the second message is queued, that the first message will be removed.
I know that i can't delete the message manually. But is it somehow possible to set an id on the message and if two messages with the same id are in the same queue, that the first one is automatically removed/replaced? That only one request is sent to the url. I know you can set a message id on the message self. But I have nothing found to replace the old one.
My PHP Code (simplified):
Btw i'm using the the php amqplib https://github.com/videlalvaro/php-amqplib.
Thanks for the help Flo
php - 消息传递信息中的 RabbitMQ 缺少通道引用
我目前正在使用 basic_get 从 rabbitMQ 获取消息后实现一些逻辑,而不会自动发送接收到的消息的 ack。
根据此处的教程(消息确认部分),我无法在 msg 本身中找到通道引用并像上面链接中提到的那样发送 ack:
那是因为在我的消息传递信息数组中没有这样的通道。
我想知道它怎么会丢失。
编辑:基本获取的代码片段
这是我的消息的 var_dump:(黄色部分)
l
rabbitmq - 处理 10 条消息后如何停止 Rabbitmq Consumer?
我从 crontab 运行消费者,它一一处理所有消息,有没有办法只消费 10 或 20 条消息,然后停止给消费者。
所以下次 cron 会调用消费者,同样的过程会再次发生。
php - amqp can't access to rabbitmq
When trying to access the rabbitmq from client side and it return 500 internal server error.
Here was the code to test the connection between client side and rabbitmq.
Here the error log. How to resolve the problem like this?
Error log :
PHP Fatal error: Uncaught exception 'PhpAmqpLib\Exception\AMQPRuntimeException' with message 'Error Connecting to server(13): Permission denied ' in /var/www/html/mydir/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:27\nStack trace:\n#0 /var/www/html/mydir/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPStreamConnection.php(21): PhpAmqpLib\Wire\IO\StreamIO->__construct('ip adress', 15672, 3, 3, NULL)\n#1 /var/www/html/mydir/send.php(13): PhpAmqpLib\Connection\AMQPStreamConnection->__construct('ip address', 15672, 'guest', 'guest')\n#2 {main}\n thrown in /var/www/html/mydir/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 27