我正在研究 Camel 中的一种机制,该机制将从可以为真或假的标志中选择消息的端点。这是一种节流机制,在我的上游通道被淹没的情况下,它会将消息重新路由到批量摄取端点(发送到 HDFS)。
最终,我的路线如下所示:
from("queue:myqueue").bean("messageParser")
.dynamicRoute(bean(ThrottleHelper.class, 'buildEndpoint'));
from('direct:regular').to('hbase');
from('direct:throttle').to('hdfs');
我的 ThrottleHelper 类的 buildEndpoint 方法如下所示:
public static String buildEndpoint() {
synchronized(shouldThrottle) {
if(shouldThrottle)
return "direct:throttle";
else
return "direct:regular"
}
}
目前,我在类上有一个名为 checkStatus(); 的方法;设置 shouldThrottle (静态变量)。checkStatus() 每分钟在 Camel 石英计时器上运行一次。
我注意到一些奇怪的行为,我想我可能在滥用这种模式。通过对 Camel 模式实现的进一步搜索,看起来 buildEndpoint() 将在返回的每个端点被消息遍历后被调用。这是真的?或者我可以期望路径在进入“direct:throttle”或“direct:regular”后终止?
从我在网上收集的信息来看,我的方法真的应该像这样吗?
public static String buildEndpoint(Message message) {
if(message.getHeader('throttled') != null)
return null;
else
message.setHeader('throttled', true);
synchronized(shouldThrottle) {
if(shouldThrottle)
return "direct:throttle";
else
return "direct:regular"
}
}
谢谢!