0

我有一种情况,我有一个带有 2 个队列 Q1 和 Q2 的 activemq 代理。我有两个使用 activemessaging 的基于 ruby​​ 的消费者。我们称它们为 C1 和 C2。两个消费者都订阅每个队列。我在订阅每个队列时设置了 activemq.prefetchSize=1。我也在设置 ack=client.

考虑以下事件序列:

1) 触发长时间运行作业的消息发布到队列 Q1。称它为 M1。

2) M1 被分派给消费者 C1,开始一个长操作。

3) 两条触发短作业的消息发布到队列 Q2。将这些称为 M2 和 M3。

4) M2 被派往 C2,C2 快速运行短作业。

5) M3 被分派到 C1,即使 C1 仍在运行 M1。它能够分派到 C1,因为 prefetchSize=1 是在队列订阅上设置的,而不是在连接上。因此,已经发送了 Q1 消息这一事实并不能阻止发送 Q2 消息。

由于 activemessaging 消费者是单线程的,因此最终结果是 M3 在 C1 上等待很长时间,直到 C1 完成对 M1 的处理。因此,尽管消费者 C2 处于空闲状态(因为它很快以消息 M2 结束),但 M3 没有长时间处理。

本质上,每当运行一个长 Q1 作业然后创建一大堆短 Q2 作业时,恰好有一个短 Q2 作业卡在等待长 Q1 作业完成的消费者身上。

有没有办法在连接级别而不是订阅级别设置 prefetchSize?我真的不希望在处理 M1 时将任何消息发送到 C1。另一种选择是我可以创建一个专用于处理 Q1 的消费者,然后让其他消费者专用于处理 Q2。但是,我宁愿不这样做,因为 Q1 消息很少——Q1 的专用消费者会在一天中的大部分时间处于空闲状态,占用内存。

4

2 回答 2

2

根据 ActiveMQ 文档的扩展 stomp 标头 ( http://activemq.apache.org/stomp.html),activemq.prefetchSize仅在 SUBSCRIBE 消息上可用,而不是 CONNECT 。以下是相关信息:

动词:订阅

头文件:activemq.prefetchSize

类型:整数

描述:指定将分派给客户端的待处理消息的最大数量。一旦达到此最大值,就不会再发送消息,直到客户端确认消息为止。设置为 1 以便在处理消息可能很慢的消费者之间非常公平地分配消息。

我对此的阅读和经验是,由于尚未确认 M1(b/c 您已打开客户端确认),因此此 M1 应该是订阅上设置的 prefetchSize=1 允许的 1 条消息。听到它不起作用我很惊讶,但也许我需要进行更详细的测试。您的设置应该适合您想要的行为。

我从其他人那里听说过关于 activemq 调度的问题,所以这可能是您正在使用的版本的错误。

我的一个建议是嗅探网络流量以查看 M1 是否由于某种原因得到确认,或者将一些 puts 语句放入 ruby​​ stomp gem 以观察通信(这是我通常最终会做的事情调试单脚问题)。

如果我有机会尝试一下,我会用我自己的结果更新我的评论。

一个建议:很可能会发送多条长处理消息,如果长处理消息的数量超过您的进程数,您将处于此修复中,快速处理消息正在等待。

我倾向于至少有一个专门的进程来完成快速的工作,或者换句话说,专门的一组进程来完成更长的工作。无论调度做什么,让所有轮询器消费者进程都听长和短可能会导致次优结果。进程组是配置消费者收听目标子集的方法:http ://code.google.com/p/activemessaging/wiki/Configuration

处理器组名称,*list_of_processors

A processor group is a way to run the poller to only execute a subset of

通过在轮询器命令行参数中传递组的名称来处理处理器。

You specify the name of the processor as its underscored lowercase

版本。因此,如果您在一个处理器组中有一个 FooBarProcessor 和 BarFooProcessor,它看起来像这样:

    ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor
    end

The processor group is passed into the poller like the following:

    ./script/poller start -- process-group=my_group
于 2010-06-03T22:51:30.143 回答
0

我不确定 ActiveMessaging 是否支持这一点,但您可以在长处理消息到达时取消订阅其他消费者,然后在处理完后重新订阅它们。

它应该会给你想要的效果。

于 2010-06-04T00:05:38.957 回答