简短的回答是没有答案:sortBy
Camel 文件组件的选项内存效率太低,无法容纳我的用例:
- 唯一性:如果文件已经存在,我不想将其放入队列中。
- 优先级:应首先处理标记为高优先级的文件。
- 性能:拥有几十万个文件,甚至几百万个文件应该没问题。
- FIFO:(奖励)最旧的文件(按优先级)应该首先被拾取。
问题似乎是,如果我正确阅读了源代码和文档,所有文件详细信息都在内存中以执行排序,无论是使用内置语言还是自定义插件sorter
。文件组件总是创建一个包含所有细节的对象列表,当经常轮询许多文件时,这显然会导致大量的垃圾收集开销。
我使用以下步骤使我的用例大部分工作,而不必求助于使用数据库或编写自定义组件:
- 从对子目录(之前)
cameldest/queue
中的文件进行递归排序的父目录上的一个文件使用者移动到两个使用者,每个目录一个,根本没有排序。cameldest/queue/high/
cameldest/queue/low/
- 通过我的实际业务逻辑仅设置消费者来处理文件。
/cameldest/queue/high/
- 设置消费者从
/cameldest/queue/low
简单地将文件从“低”提升到“高”(复制它们,即.to("file://cameldest/queue/high");
)
- 至关重要的是,为了仅在 high 不忙时从“low”提升到“high”,请将路由策略附加到“high”,以限制其他路由,即如果“high”中有任何消息在进行中,则为“low” "
- 此外,我
ThrottlingInflightRoutePolicy
在“high”中添加了一个,以防止它同时进行过多的交换。
想象一下这就像在机场办理登机手续时,如果商务舱通道是空的,游客会被邀请进入商务舱通道。
这就像负载下的魅力一样,即使数十万个文件在“低”队列中,直接放入“高”的新消息(文件)也会在几秒钟内得到处理。
该解决方案没有涵盖的唯一要求是有序性:不能保证首先拾取较旧的文件,而是随机拾取它们。可以想象这样一种情况:源源不断的传入文件可能导致某个特定文件 X 总是不走运并且永远不会被拾取。不过,这种情况发生的可能性非常低。
可能的改进:目前允许/暂停将文件从“低”提升到“高”的阈值设置为“高”中的 0 条消息。一方面,这保证了在执行从“低”的另一个提升之前,将处理掉到“高”的文件,另一方面,它会导致一些停止-启动模式,尤其是在多线程中设想。虽然不是一个真正的问题,但其表现令人印象深刻。
来源:
我的路线定义:
ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
trp.setMaxInflightExchanges(50);
SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");
from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("lowPriority")
.log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
.to("file://cameldest/queue/high");
from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("highPriority")
.routePolicy(trp)
.routePolicy(sorp)
.threads(20)
.log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
.delay(2000) // This is where business logic would happen
.log("After: ${in.headers."+Exchange.FILE_PATH+"}")
.stop();
我的SuspendOtherRoutePolicy
,松散的结构ThrottlingInflightRoutePolicy
public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {
private CamelContext camelContext;
private final Lock lock = new ReentrantLock();
private String otherRouteId;
public SuspendOtherRoutePolicy(String otherRouteId) {
super();
this.otherRouteId = otherRouteId;
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void onStart(Route route) {
super.onStart(route);
if (camelContext.getRoute(otherRouteId) == null) {
throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
}
}
@Override
public void setCamelContext(CamelContext context) {
camelContext = context;
}
@Override
public void onExchangeDone(Route route, Exchange exchange) {
//log.info("Exchange done on route " + route);
Route otherRoute = camelContext.getRoute(otherRouteId);
//log.info("Other route: " + otherRoute);
throttle(route, otherRoute, exchange);
}
protected void throttle(Route route, Route otherRoute, Exchange exchange) {
// this works the best when this logic is executed when the exchange is done
Consumer consumer = otherRoute.getConsumer();
int size = getSize(route, exchange);
boolean stop = size > 0;
if (stop) {
try {
lock.lock();
stopConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
// reload size in case a race condition with too many at once being invoked
// so we need to ensure that we read the most current size and start the consumer if we are already to low
size = getSize(route, exchange);
boolean start = size == 0;
if (start) {
try {
lock.lock();
startConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
}
private int getSize(Route route, Exchange exchange) {
return exchange.getContext().getInflightRepository().size(route.getId());
}
private void startConsumer(int size, Consumer consumer) throws Exception {
boolean started = super.startConsumer(consumer);
if (started) {
log.info("Resuming the other consumer " + consumer);
}
}
private void stopConsumer(int size, Consumer consumer) throws Exception {
boolean stopped = super.stopConsumer(consumer);
if (stopped) {
log.info("Suspending the other consumer " + consumer);
}
}
}