1

我在 Camel 中定义了一条路线,如下所示: GET 请求进来,在文件系统中创建了一个文件。文件消费者拾取它,从外部 Web 服务获取数据,并通过 POST 将结果消息发送到其他 Web 服务。

简化代码如下:

    // Update request goes on queue:
    from("restlet:http://localhost:9191/update?restletMethod=post")
    .routeId("Update via POST")
    [...some magic that defines a directory and file name based on request headers...]
    .to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")

    // Update gets processed
    from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
    .routeId("Update main route")
    .streamCaching() //otherwise stuff can't be sent to multiple endpoints
    [...enrich message from some web service using http4 component...]
    .multicast()
        .stopOnException()
        .to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
    .end();

多播中的三个端点只是将生成的消息发布到其他 Web 服务。

cameldest当队列(即文件目录)相当空时,这一切都很好。正在创建文件cameldest/<subdir>,由文件使用者拾取并移入cameldest/<subdir>/inprogress,并且正在将内容发送到三个传出 POST 没有问题。

但是,一旦传入的请求堆积到大约 300,000 个文件,进度就会减慢,最终管道会由于内存不足错误(超出 GC 开销限制)而失败。

通过增加日志记录,我可以看到文件消费者轮询基本上从不运行,因为它似乎对每次看到的所有文件负责,等待它们完成处理,然后才开始另一轮轮询。除了(我假设)导致资源瓶颈之外,这也干扰了我的排序要求:一旦队列被成千上万条等待处理的消息堵塞,那么天真地排序更高的新消息 - 如果它们仍然被拾取- 仍在那些已经“开始”的人后面等待。

现在,我尝试了maxMessagesPerPollandeagerMaxMessagesPerPoll选项。起初,它们似乎缓解了这个问题,但经过几轮投票后,我仍然发现有数千个文件处于“已启动”的边缘。

唯一有效的方法是使瓶颈delay变得maxMessages...如此狭窄,以至于处理的平均完成速度比文件轮询周期快。

显然,这不是我想要的。我希望我的管道尽可能快地处理文件,但不是更快。我期待文件使用者在路线繁忙时等待。

我犯了一个明显的错误吗?

(如果这是问题的一部分,我正在使用 XFS 的 Redhat 7 机器上运行稍旧的 Camel 2.14.0。)

4

3 回答 3

2

尝试在 from 文件端点上将 maxMessagesPerPoll 设置为较低的值,以便每次轮询最多只拾取 X 个文件,这也限制了您在 Camel 应用程序中将拥有的飞行消息的总数。

您可以在文件组件的 Camel 文档中找到有关该选项的更多信息

于 2017-02-19T08:02:56.277 回答
1

简短的回答是没有答案:sortByCamel 文件组件的选项内存效率太低,无法容纳我的用例:

  • 唯一性:如果文件已经存在,我不想将其放入队列中。
  • 优先级:应首先处理标记为高优先级的文件。
  • 性能:拥有几十万个文件,甚至几百万个文件应该没问题。
  • FIFO:(奖励)最旧的文件(按优先级)应该首先被拾取。

问题似乎是,如果我正确阅读了源代码文档,所有文件详细信息都在内存中以执行排序,无论是使用内置语言还是自定义插件sorter。文件组件总是创建一个包含所有细节的对象列表,当经常轮询许多文件时,这显然会导致大量的垃圾收集开销

我使用以下步骤使我的用例大部分工作,而不必求助于使用数据库或编写自定义组件:

  • 从对子目录(之前)cameldest/queue中的文件进行递归排序的父目录上的一个文件使用者移动到两个使用者,每个目录一个,根本没有排序。cameldest/queue/high/cameldest/queue/low/
  • 通过我的实际业务逻辑设置消费者来处理文件。/cameldest/queue/high/
  • 设置消费者从/cameldest/queue/low简单地将文件从“低”提升到“高”(复制它们,即.to("file://cameldest/queue/high");
  • 至关重要的是,为了仅在 high 不忙时从“low”提升到“high”,请将路由策略附加到“high”,以限制其他路由,即如果“high”中有任何消息在进行中,则为“low” "
  • 此外,我ThrottlingInflightRoutePolicy在“high”中添加了一个,以防止它同时进行过多的交换。

想象一下这就像在机场办理登机手续时,如果商务舱通道是空的,游客会被邀请进入商务舱通道。

这就像负载下的魅力一样,即使数十万个文件在“低”队列中,直接放入“高”的新消息(文件)也会在几秒钟内得到处理。

该解决方案没有涵盖的唯一要求是有序性:不能保证首先拾取较旧的文件,而是随机拾取它们。可以想象这样一种情况:源源不断的传入文件可能导致某个特定文件 X 总是不走运并且永远不会被拾取。不过,这种情况发生的可能性非常低。

可能的改进:目前允许/暂停将文件从“低”提升到“高”的阈值设置为“高”中的 0 条消息。一方面,这保证了在执行从“低”的另一个提升之前,将处理掉到“高”的文件,另一方面,它会导致一些停止-启动模式,尤其是在多线程中设想。虽然不是一个真正的问题,但其表现令人印象深刻。


来源:

我的路线定义:

    ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
    trp.setMaxInflightExchanges(50);

    SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");

    from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
    .routeId("lowPriority")
    .log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
    .to("file://cameldest/queue/high");

    from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
    .routeId("highPriority")
    .routePolicy(trp)
    .routePolicy(sorp)
    .threads(20)
    .log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
    .delay(2000) // This is where business logic would happen
    .log("After: ${in.headers."+Exchange.FILE_PATH+"}")
    .stop();

我的SuspendOtherRoutePolicy,松散的结构ThrottlingInflightRoutePolicy

public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {

    private CamelContext camelContext;
    private final Lock lock = new ReentrantLock();
    private String otherRouteId;

    public SuspendOtherRoutePolicy(String otherRouteId) {
        super();
        this.otherRouteId = otherRouteId;
    }

    @Override
    public CamelContext getCamelContext() {
        return camelContext;
    }

    @Override
    public void onStart(Route route) {
        super.onStart(route);
        if (camelContext.getRoute(otherRouteId) == null) {
            throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
        }
    }

    @Override
    public void setCamelContext(CamelContext context) {
        camelContext = context;
    }

    @Override
    public void onExchangeDone(Route route, Exchange exchange) {
        //log.info("Exchange done on route " + route);
        Route otherRoute = camelContext.getRoute(otherRouteId);
        //log.info("Other route: " + otherRoute);
        throttle(route, otherRoute, exchange);
    }

    protected void throttle(Route route, Route otherRoute, Exchange exchange) {
        // this works the best when this logic is executed when the exchange is done
        Consumer consumer = otherRoute.getConsumer();

        int size = getSize(route, exchange);
        boolean stop = size > 0;
        if (stop) {
            try {
                lock.lock();
                stopConsumer(size, consumer);
            } catch (Exception e) {
                handleException(e);
            } finally {
                lock.unlock();
            }
        }

        // reload size in case a race condition with too many at once being invoked
        // so we need to ensure that we read the most current size and start the consumer if we are already to low
        size = getSize(route, exchange);
        boolean start = size == 0;
        if (start) {
            try {
                lock.lock();
                startConsumer(size, consumer);
            } catch (Exception e) {
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

    private int getSize(Route route, Exchange exchange) {
        return exchange.getContext().getInflightRepository().size(route.getId());
    }

    private void startConsumer(int size, Consumer consumer) throws Exception {
        boolean started = super.startConsumer(consumer);
        if (started) {
            log.info("Resuming the other consumer " + consumer);
        }
    }

    private void stopConsumer(int size, Consumer consumer) throws Exception {
        boolean stopped = super.stopConsumer(consumer);
        if (stopped) {
            log.info("Suspending the other consumer " + consumer);
        }
    }
}
于 2017-02-24T10:34:36.920 回答
0

除非您确实需要将数据另存为文件,否则我会提出一个替代解决方案。

从您的 restlet 消费者,将每个请求发送到消息队列应用程序,例如 activemq 或 rabbitmq 或类似的东西。您很快就会在该队列上收到大量消息,但这没关系。

然后用队列消费者替换您的文件消费者。这将需要一些时间,但每条消息都应单独处理并发送到您想要的任何地方。我用大约 500 000 条消息测试了 rabbitmq,效果很好。这也应该减少消费者的负担。

于 2017-02-20T08:07:55.130 回答