问题标签 [librabbitmq]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1504 浏览

c - AMQP RabbitMQ 消费者互相阻塞?

我编写了一个 C (rabbitmq-c) worker 应用程序,它使用 Python 脚本 (pika) 发布的队列。

我有以下似乎无法解决的奇怪行为:

  1. 在消息发布到队列之前启动所有工作人员按预期工作
  2. 队列发布后启动 1 个工作人员按预期工作
  3. 但是:在工作人员开始从队列中消费之后启动额外的工作人员意味着这些工作人员看不到队列上的任何消息(消息计数 = 0),因此只需等待(即使队列上仍有许多消息)。杀死第一个工作人员会突然开始消息流向所有其他(等待的)消费者。

任何想法可能会发生什么?

我已经尝试确保每个消费者都有自己的频道(这是必要的吗?)但仍然是相同的行为......

这是消费者(工人)的代码:

0 投票
1 回答
268 浏览

python - 芹菜 + 反向主题交换(x-rtopic)不起作用

我正在尝试使用来自 Alvaro Videla的反向主题交换 rabbitmq 插件创建一个 celery 应用程序。工作人员似乎可以使用此交换很好地与经纪人联系,但是当我对我的任务进行主题反向路由时,不要选择“#”或“*”,就像直接交换一样。

这就是我的队列:

现在图片 2 个工作人员使用以下 routing_key

  • Worker1:intel.8.host1
  • Worker2:amd.2.host2

这就是我正在尝试的任务以及我所经历的路由键:

为了确定问题出在哪里,我测试了使用 pika 和 kombu 进行简单消息传递的插件,并且两者都运行良好,完全符合预期。所以我认为 Celery 交换(路由)消息的方式一定有问题。也许我应该创建一个自定义路由类!?

提前致谢。

0 投票
1 回答
974 浏览

ios - 如何集成rabbit mq客户端库

在大量关于 Rabbit MQ 的搜索中,我找到了目标 C 包装器,librabbitmq-c其链接指向librabbitmq-objc

对于 librabbitmq-c 链接找到https://github.com/alanxz/rabbitmq-c

我试图将两者集成到我的应用程序中,但产生了很多错误,例如

我还通过此链接尝试了最新版本的 librabbitmq-c https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.2/rabbitmq-c-0.5.2.tar.gz

第一个和第二个问题通过用<Cocoa/Cocoa.h><Foundation/Foundation.h> 替换<amqp.h>解决"amqp.h"

但我无法解决其余的问题

我的客户端库实现如下: -

任何有关Rabbit MQ的帮助将不胜感激,谢谢

0 投票
1 回答
486 浏览

c - 如何检查消息是否传递到任何单个队列

我在 linux 中使用 rabbitmq-c lib。我想要这么基本的请求和响应。我使用了 amqp_basic_publish必填字段为true,我只想知道消息是在任何队列中传递的。我想要基本的c示例。

0 投票
1 回答
1728 浏览

celery - 如何判断 Celery 是否使用 librabbitmq?

根据芹菜文档

librabbitmq

如果您使用 RabbitMQ (AMQP) 作为代理,那么您可以安装 librabbitmq 模块以使用用 C 编写的优化客户端:

'amqp' 传输将自动使用 librabbitmq 模块(如果已安装),或者您也可以使用 pyamqp:// 或 librabbitmq:// 前缀直接指定所需的传输。

我安装librabbitmq并更改了BROKER_URL设置,使其以librabbitmq://.

我如何验证 Celery 现在正在使用 librabbitmq(即我做的一切正确)?

0 投票
0 回答
148 浏览

php - RabbitMQ 多通道

我已经使用 RabbitMQ 和 NodeJs 开发了一个 PHP 应用程序。然后我在同一台服务器上制作了该应用程序的副本。问题是在任何应用程序中发送任何消息,另一个应用程序也在列表中,所以我希望每个应用程序都在隔离通道上工作。

发送.php

接收.php

0 投票
1 回答
136 浏览

rabbitmq - Rabbitmq Acking 在多个消费者中

我有一个队列,多个消费者以循环方式工作。

Delivery_tag 用于在消费者完成处理后确认消息。但是多个消费者设置相同delivery_tag,尽管他们正在处理的消息不同。

结果,当 acking 完成时,我得到了precondition failed

谁能建议我做错了什么?

这是消费者代码。我从这个链接得到它:

http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

0 投票
0 回答
344 浏览

amazon-elb - kombu/librabbitmq:ConnectionError:无法恢复频道

  1. 我正在使用生产者池组向 rabbitmq 发布消息,但是发布调用失败并出现以下错误:

.

  1. 奇怪的是,我收到此错误的消息之前的消息没有到达消费者端。我在 Connection 对象上使用 transport_options={'confirm_publish': True} ,该对象被传递给生产者池。

  2. rabbitmq 服务器位于 AWS 上的 ELB 后面。

关于上面这个错误的任何想法以及为什么消息可能会丢失?

谢谢。

0 投票
1 回答
930 浏览

celery - 带有 Python 3 的 Librabbitmq 2.0.0 给出了 TypeError: can't pickle memoryview objects

我正在使用 git repo https://github.com/celery/librabbitmq的最新主分支,并按照自述文件中的说明安装librabbitmq==2.0.0Python 3.6

使用开发版本

您可以通过执行以下操作克隆存储库:

然后通过执行以下操作安装它:

这工作正常(在操作系统中安装某些二进制文件以进行 c 编译之后),但是当我随后制作一个小的a+b添加任务并使用它调用它add.delay(2,2)时失败并出现以下错误。抬头一看,Celery 4 使用 json 作为序列化器,所以显然不是因为如果 pickle 序列化

  1. 从 librabbitmq 更改为 pyamqp 代理工作正常
  2. MacOS 和 Ubuntu 16 中的完全相同的情况

[2018-04-30 23:40:02,956: CRITICAL/MainProcess] 不可恢复的错误:SystemError('返回了带有错误集的结果',)回溯(最近一次调用):文件“/Users/somghosh/.virtualenvs/ ctdb/lib/python3.6/site-packages/kombu/messaging.py",第 624 行,在 _receive_callback 中返回 on_m(message) if on_m else self.receive(decoded, message) File "/Users/somghosh/.virtualenvs/ ctdb/lib/python3.6/site-packages/celery/worker/consumer/consumer.py”,第 570 行,在 on_task_received 回调中,文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site- packages/celery/worker/strategy.py”,第 145 行,task_message_handler 句柄(req)文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/worker.py” ,第 221 行,在 _process_task_sem 返回 self._quick_acquire(self._process_task, req) 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/kombu/async/semaphore.py”,第 62 行,在获取回调中(*partial_args, **partial_kwargs) 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/worker.py”,第 226 行,在 _process_task req.execute_using_pool(self.池)文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/request.py”,第 531 行,在 execute_using_pool correlation_id=task_id,文件“/Users/somghosh/. virtualenvs/ctdb/lib/python3.6/site-packages/celery/concurrency/base.py”,第 155 行,在 apply_async **options) 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/billiard/pool.py”,第 1486 行,在 apply_async self._quick_put((TASK, (result._job, None, func, args, kwds))) 文件“/Users/somghosh/.virtualenvs/ ctdb/lib/python3.6/site-packages/celery/concurrency/asynpool.py",第 813 行,在 send_job body = dumps(tup, protocol=protocol) TypeError: can't pickle memoryview objects

上述异常是以下异常的直接原因:

Traceback(最近一次调用最后):文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/worker.py”,第 203 行,在 start self.blueprint.start( self) 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/bootsteps.py”,第 119 行,在 start step.start(parent) 文件“/Users/somghosh/. virtualenvs/ctdb/lib/python3.6/site-packages/celery/bootsteps.py”,第 370 行,在开始返回 self.obj.start() 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3. 6/site-packages/celery/worker/consumer/consumer.py”,第 320 行,在 start blueprint.start(self) 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/ celery/bootsteps.py",第 119 行,在 start step.start(parent) 文件中"/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/consumer/consumer.py”,第 596 行,在 start c.loop(*c.loop_args()) 文件中“/ Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/celery/worker/loops.py”,第 88 行,在 asynloop next(loop) 文件中“/Users/somghosh/.virtualenvs/ctdb/lib /python3.6/site-packages/kombu/async/hub.py”,第 354 行,在 create_loop cb(*cbargs) 文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/ kombu/transport/base.py”,第 236 行,on_readable 阅读器(循环)文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/kombu/transport/base.py”,行218、在_read drain_events(timeout=0)文件“/Users/somghosh/.virtualenvs/ctdb/lib/python3.6/site-packages/librabbitmq-2.0.0-py3.6-macosx-10.6-intel.egg/librabbitmq/init .py",第 227 行,在 drain_events self._basic_recv(timeout) SystemError:返回带有错误集的结果

0 投票
1 回答
209 浏览

rabbitmq - amqp_basic_qos 没有任何效果

我正在尝试使用 librabbitmq 编写一个简单的消费者。它正在工作,但是当我执行 amqp_basic_consume 时,它​​会消耗整个队列。我想要的是它得到一条消息,处理它并重复。

我尝试使用 basic_qos 一次让消费者预取 1,但这似乎根本没有效果。

基本设置和循环: // 一次设置 1 条消息的 qos if (!amqp_basic_qos(conn, channel, 0, 1, 0)) { die_on_amqp_error(amqp_get_rpc_reply(conn), "basic.qos"); }

我希望有一个队列被填满,我可以开始处理,如果我点击 ^C,剩余的消息仍在队列中。相反,即使我只处理了一条消息,整个队列也会被清空。