问题标签 [langohr]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
multithreading - Clojure + RabbitMQ / 多线程消息消费
上下文:Clojure + RabbitMQ(通过 Langohr),对这个问题的跟进。
我在使用来自 RabbitMQ mq 的消息时得到了奇怪的结果(从直接交换获取消息并在处理消息后发布到扇出交换)。我不明白为什么消息在消费期间最终会出现在单独的线程中(每隔几条消息就会发生线程切换)。
消费者从一个单独的线程开始(以防止它在发生任何 IO 异常时使主线程崩溃),但这并不能解释切换。
消息处理程序只打印出当前线程和接收到的有效负载。这就是我得到的:
笔记
我在和特工玩的时候注意到了这一点。我想在其自己的 CPU 绑定线程池中处理每条消息,并将其发布到无界 (IO) 线程池中。但是,在打印出当前线程后,我注意到即使不使用代理(或期货),消息也会由不同的线程处理。
clojure - 如何在 clojure 中存根 langohr RabbitMQ 交互?
我正在尝试对 RabbitMQ 交互进行存根,因为这些并不是我正在编写的应用程序的主要目的。
所以,我尝试在我的测试中重新绑定 langohr 函数,如下所示:
当我运行测试时lein test
,我得到一个
我一直在尝试其他几种方法,包括不同的测试框架来重新定义或重新绑定 langohr 库函数,但没有任何进展。
我已经测试了其他场景,并且我已经成功地使用上述代码结构对 cheshire (json parsing clojure lib) 函数进行了存根。我谦虚地请求帮助,以了解为什么我的 langohr 存根不起作用,以及如何以优雅的方式做到这一点的提示。
macros - 从 Clojure 代码中重构冗余以实现 RabbitMQ 扇形
我正在浏览他们网页上的 RabbitMQ 教程,作为练习,我试图重构他们提供的示例,以使它们更加通用和可组合。我被困在第二个“blabbr”示例上。这是我要重构的功能:
我以为我可以制作一个宏并在函数中调用它,如下所示:
wrap-publish
但是,当我在 repl 上单独测试宏时,出现此错误:
似乎有一些全球性的事情不会让我绑定我的变量,但我不知道是什么。我顺着鼻子找到了从堆栈跟踪中扔给我的源代码 ,并在那里遇到了死胡同。我只是不知道该怎么办。我是一名新程序员,正在逐步进入异步和宏的世界。因此,我将不胜感激任何建议,这些建议不仅可以帮助我实现目标,还可以提供一般见解,这些见解将告知我的基本技能,并让我知道我是否采取了正确的方法。我正在使用 langohr 依赖项 [com.novemberain/langohr "2.9.0"]。
clojure - 如何在 Langohr 中容忍 RabbitMQ 重启?
我们有从 Rabbit 队列中读取的 Clojure 代码。我们愿意容忍 RabbitMQ 服务器短暂关闭的情况,例如在重新启动的情况下 ( sudo service rabbitmq-server restart
)。
Langohr似乎有一些重新连接的规定。我们改编了这个例子clojurewerkz.langohr.examples.recovery.example1
(这里是要点)。与已发布示例的细微差别包括连接参数和lb/publish
调用的删除(因为我们使用外部源填充数据)。
我们可以成功地从队列中消费数据并等待更多消息。但是,当我们重新启动 RMQ(通过上面的sudo
命令在托管 RabbitMQ 的 VM 上)时,会抛出以下异常:
Langohr 提供的预期重启机制似乎在启动时会中断。在这些“硬”重启的情况下,是否有另一种首选模式?或者,我想我们必须实现连接监控并自己重试。任何建议都将受到欢迎。
clojure - 当rabbitmq服务器重新启动时,RabbitMQ Clojure Langohr不会抛出错误
我有以下代码来监视数据库表以每隔几秒钟创建一次 MQ 消息。我发现当我重新启动 RabbitMQ 服务器时,应用程序仍然运行而没有抛出异常,并且它仍然创建了打印消息。那么为什么它不会抛出异常呢?另一个问题是当我杀死应用程序时如何关闭连接?既然是服务,我没地方写代码来关闭 RabbitMQ 连接。
clojure - 有没有办法让 RabbitMQ 批量传递消息,而不是一次传递一个消息?
我将 RabbitMQ 与 Clojure 和 Langohr 一起使用,我想分批处理队列中的消息,而不是一次处理一个。当然,我可以在将消息从队列中拉出后自己批处理消息,但我很好奇是否有一个 API 调用或我缺少的设置可以让 RMQ 一次将 500 条消息传递到一个消费者。这可能吗?
clojure - 如何使用 Langohr 在 RabbitMQ 中重新传递排队的消息?
背景
我们使用 Langohr(版本 3.4.0)使用来自 RabbitMQ 的消息,并尝试将它们持久化到 MongoDB 中。我们没有使用auto-ack
,因为如果我们无法将消息持久保存在 MongoDB 中,我们希望稍后能够重试。我们正在使用ack-unless-exception
函数来适应这种情况。今晚 MongoDB 发生了临时中断,并在短时间内停机,在此期间,有 40 条消息无法持久化,因此保留在 RabbitMQ 队列中。但是当 MongoDB 再次启动时,我们的 Langohr 处理程序刚刚收到新消息。在我们重新启动我们的应用程序之前,旧的并没有交付给我们。
问题
我们怎样才能让 RabbitMQ 用 Langohr 重新传递以前的 nack:ed 消息,而不必重新启动我们的应用程序?
rabbitmq - 指示 RabbitMQ 定期重新发送未传递的消息
背景
我们正在使用langohr与 RabbitMQ 进行交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是将basic.nack
带有requeue
set 的 a 发送到,true
但这将立即重新发送消息,直到服务以 a 响应basic.ack
。例如,如果服务尝试将消息持久保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,最好每 20 秒左右获取一次未传递的消息(即,我们既不做,basic.ack
或者basic.nack
如果数据存储已关闭,我们只是让消息保留在队列中)。我们尝试使用ExecutorService
其要点来实现这一点,如下所示:
不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。但是,我们还有其他使用spring-rabbitmq(Java 中)实现的服务,他们似乎开箱即用地处理了这个问题。我已经尝试查看源代码以弄清楚他们是如何做到的,但我还没有设法这样做。
问题
你如何指示 RabbitMQ 定期(重新)传递队列中的消息(最好使用 Langohr)?
clojure - Langohr 消息处理程序未在 clojure 映射中转换 amqp 标头
我有一个 langohr 消息处理程序,它现在只是读取有效负载和标头并打印它们。问题是当我在交易所发布消息时,我无法查询记录的标头(获取标头“h1”),也无法查询本机 clojure 映射。我在这里做错了什么?
发布带有标头的消息时
user => (publish ch "some.topic.exchange" "key1" "{\"id\":1}" {:headers {"h1" "value"}}) nil
输出
作业状态更新:{:status {:id 1}, :metadata #object[java.util.HashMap 0x3cb4bf18 {h1=value}]}
clojure - 我可以在重复运行的计划任务中执行“阻塞订阅”吗?
最近我发现我的 Clojure/Ring/Jetty 服务器NoClassDefFoundError
在我cider-connect
进入它并做某事时反复进入。我猜那是因为线程池被一些死线程耗尽了。
然后我在服务器中发现了这个函数,它每天通过定义clojurewerkz.quartzite.scheduler
作业运行一次:
其中包含的包定义为:
我怀疑这blocking-subscribe
会导致它曾经被永远阻塞的线程耗尽JVM的线程池并最终导致NoClassDefFoundError
错误。
我不确定这一点,但我可以blocking-subscribe
在重复运行的计划任务中执行吗?
我上面描述的事情可能吗?还是我的扣法有问题?
谢谢。