0

最近我发现我的 Clojure/Ring/Jetty 服务器NoClassDefFoundError在我cider-connect进入它并做某事时反复进入。我猜那是因为线程池被一些死线程耗尽了。

然后我在服务器中发现了这个函数,它每天通过定义clojurewerkz.quartzite.scheduler作业运行一次:

(defn consumer-msgs-announcement
  [rabbitmq queue-name & args]
  (with-open [conn (lc/connect rabbitmq)]
    (let [ch  (lch/open conn)]
      (lq/declare ch queue-name {:durable false :auto-delete false})
      (println " [*] Waiting for messages. To exit press CTRL+C")
      ;; (lcons/blocking-subscribe ch queue-name msg-queue/post-wxmsg-handle-delivery {:auto-ack true})
      (lcons/blocking-subscribe ch queue-name handle-delivery-announcement {:auto-ack true})
      )))

其中包含的包定义为:

   [langohr.core :as lc]
   [langohr.channel :as lch]
   [clojure.string :as str]
   [langohr.queue :as lq]
   [langohr.consumers :as lcons]

我怀疑这blocking-subscribe会导致它曾经被永远阻塞的线程耗尽JVM的线程池并最终导致NoClassDefFoundError错误。

我不确定这一点,但我可以blocking-subscribe在重复运行的计划任务中执行吗?

我上面描述的事情可能吗?还是我的扣法有问题?

谢谢。

4

1 回答 1

2

根据我的经验blocking-subscribe,确实会无限期地阻塞线程。您可以快速检查源代码blocking-subscribe以查看它是否进入了可能无穷无尽的惰性序列上的 do-seq。

看起来您只想让您的作业收集自上次运行以来已在队列中备份的任何现有消息。对于这种访问模式,有几个选项对我来说很重要。

  1. 您可以使用langohr.basic/get来处理队列中的消息,直到收到nil.

  2. 您可以使用langohr.consumers/subscribe创建一个简单的非阻塞消费者

    • 启动消费者
    • 保留您获得的消费者标签
    • 睡眠预定的时间
    • 使用 langohr.basic/cancel 取消消费者

PS 我无法发布subscribe/cancel文档的链接,但很容易在源代码和 langohr API 参考中找到。

于 2017-08-03T15:36:39.857 回答