3

我有一个带有拆分器(使用流式传输)的骆驼路线,它将消息发送到要处理的 seda 队列。当我试图轻轻停止应用程序时,seda 队列不会立即停止,它正在处理所有消息,然后最终关闭。我能做些什么来立即阻止它?

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;

public class MySedaShutdownTest extends RouteBuilder  {

@Override
public void configure() throws Exception {
    onException(Exception.class)
        .process(new Processor() {

            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("exception");
            }

        });

    from("timer:myTimer?repeatCount=1")
        .split(ExpressionBuilder.beanExpression(new MySplitter(), "myIterator"))
                .streaming()
                .to("seda:mySeda");

    from("seda:mySeda")
        .throttle(1)
        .process(new Processor() {

            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("processing: " + exchange.getIn().getBody()
                        + "; app status: " + exchange.getContext().getStatus());

            }

        });
}

public static class MySplitter {

    public Iterator<String> myIterator() {
        List<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; i++) {
            values.add("string nr : " + i);
        }
        System.out.println("in myIterator");
        return values.iterator();
    }
}

public static void main(String[] a) throws Exception {
    final Main main = new Main();
    new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println("invoking shutdown");
                main.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }).start();
    System.out.println("starting app");
    main.enableHangupSupport();
    main.addRouteBuilder(new MySedaShutdownTest());
    main.run();
}

}

4

2 回答 2

1

SedaEndpoint 上有一个 purgeQueue 方法。因此,您可以获取端点并调用此方法。您也可以从 JMX 访问它。

有点相关,我们有这张票需要改进

我为此记录了一张票

于 2013-05-29T06:29:31.770 回答
0

只需在下面添加字符串作为configure()方法的第一行:

getContext().getShutdownStrategy().setTimeout(1);

它将关闭超时从 300 秒(默认)减少到 1

查看更多关于控制路由启动和关闭的信息。

于 2013-05-28T12:00:07.090 回答