0

上下文:Clojure + RabbitMQ(通过 Langohr),对这个问题的跟进。

我在使用来自 RabbitMQ mq 的消息时得到了奇怪的结果(从直接交换获取消息并在处理消息后发布到扇出交换)。我不明白为什么消息在消费期间最终会出现在单独的线程中(每隔几条消息就会发生线程切换)。

消费者从一个单独的线程开始(以防止它在发生任何 IO 异常时使主线程崩溃),但这并不能解释切换。

; Message handler
(defn message-handler
  [pub-name ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        content (string/join " " (map msg '("title" "link" "body")))
        tags (pluck-tags content)]
    (println (format "HANDLER %s: Message: %s | found tags: %s"
                     (Thread/currentThread)
                     (msg "title")
                     (tags-to-csv tags)))
  (nil)))
  ; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))


(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
       (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))

消息处理程序只打印出当前线程和接收到的有效负载。这就是我得到的:

HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...

笔记

我在和特工玩的时候注意到了这一点。我想在其自己的 CPU 绑定线程池中处理每条消息,并将其发布到无界 (IO) 线程池中。但是,在打印出当前线程后,我注意到即使不使用代理(或期货),消息也会由不同的线程处理。

4

2 回答 2

1

1)那里有一个扇出交换,这意味着在路由消息时根本不使用路由密钥。扇出交换将消息路由到绑定到它的每个队列。如果您想使用路由键,请使用直接或主题交换。

2)您总是使用相同的队列名称,这意味着您的代码所做的只是将多个消费者添加到同一个队列中。这意味着rabbitmq 只会围绕你的消费者循环消息。

于 2012-10-13T21:42:11.727 回答
0

Langohr 的作者在这里。

代码中一定缺少某些东西。如果您使用代理获得此输出,那很容易:Clojure 代理(也包括期货和承诺)使用线程池。Langohr 的 langohr.consumers/subscribe 或 RabbitMQ Java 客户端中的底层 QueueingConsumer 没有。

于 2012-10-13T21:41:24.880 回答