0

我正在使用骆驼 2.17 和 Fuse 6.3。我有一个场景,我需要将我的消息发送到多个端点,每个端点都进行一些 dB 记录。

我正在使用“recipientList”将我的消息发送到这些端点,但在这里我面临一个问题,我的数据库条目不是按照端点调用的顺序排列的。

假设,我有 3 个端点 A、B、C,并且在处理 MessageA、MessageB 和 MessageC 之后,它们在 DB 中分别记录 1 条消息,但是当我运行我的路由 RL(a,b,c) 时,我看不到相同的 DB 消息尽管这些是直接端点。

有没有办法让我的路线等待第一个端点完成然后处理第二个端点?

这是我的路线示例

<route id="RouteStart">
        <from id="_from1" uri="file:.../>
        <bean id="_bean1" method="hasReadWriteAccess" ref="fileAccessValidator"/>
        <log id="_log1" message=${body}"/>
        <recipientList id="_recipientList1">
            <simple>FirstStepInsert, MessageDirQueue, Step1Queue</simple>
        </recipientList>
    </route>
    <!-- This route picks the file from queue, encodes the file in to UTF-8 BOM and validates file against XSD -->
    <route id="Step1Route">
        <from id="_from2" uri="Step1Queue"/>
        <log id="_log2" message=${body}"/>
         <setProperty id="_setProperty4" propertyName="stepName">
            <simple>File Validation</simple>
        </setProperty>
        <doTry id="_doTry1">
            <to id="_to1" uri="Step2Queue"/>
            <doCatch id="_doCatch1">
                <exception>org.apache.camel.ValidationException</exception>
                <to id="_to2" uri="file:..?autoCreate=true"/>
                <log id="_log7" message="Moved Invalid file ${file:name}"/>
            </doCatch>
            <doCatch id="_doCatch2">
                <exception>java.io.IOException</exception>
                <to id="_to3" uri="file:..?autoCreate=true"/>
                <log id="_log8" message="Moved XML file ${file:name} with Incorrect access"/>
            </doCatch>
            <doFinally id="_doFinally1">
                <to id="_to4" uri="direct:stepInsertLogging"/>
            </doFinally>
        </doTry>
    </route>
    <route id="Step2Route">
        <from id="_from3" uri="Step2Queue"/>
        <log id="_log9" message="${body}"/>
        <setProperty id="_setProperty12" propertyName="stepName">
            <simple>Data Transformation</simple>
        </setProperty>
        <recipientList id="_recipientList2">
            <simple>direct:stepInsertLogging, ReceiveDirQueue, Step3Queue</simple>
        </recipientList>
    </route>
    <route id="Step3Route">
        <from id="_from4" uri="Step3Queue"/>
        <setProperty id="_setProperty19" propertyName="stepName">
            <simple>File Delivered</simple>
        </setProperty>
        <to id="_to5" uri="file:..."/>
        <onException id="_onException1">
            <exception>java.io.IOException</exception>
            <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="0"/>
        <recipientList id="_recipientList3">
            <simple>direct:stepInsertLogging, direct:flowUpdateLogging</simple>
        </recipientList>
    </route>
    <!-- This route sends a copy of source file to Message Archive folder -->
    <route id="MessageDirRoute">
        <from id="_from9" uri="MessageDirQueue"/>
        <log id="_log23" message="${body}"/>
        <setProperty id="_setProperty63" propertyName="stepName">
            <simple>Data SourceFile Logging</simple>
        </setProperty>
        <to id="_to13" uri="file:.."/>
        <onException id="_onException4">
            <exception>java.io.IOException</exception>
            <handled>
                <constant>true</constant>
            </handled>
        <to id="_to14" uri="direct:stepInsertLogging"/>
    </route>

    <!-- This route sends a copy of destination file to Message Archive folder -->
    <route id="receiveDirectory-route">
        <from id="_from10" uri="ReceiveDirQueue"/>
        <setProperty id="_setProperty77" propertyName="stepName">
            <simple>Data DestinationFile Logging</simple>
        </setProperty>
        <to id="_to15" uri="file:.."/>
        <onException id="_onException5">
            <exception>java.io.IOException</exception>
            <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="0"/>
            <handled>
                <constant>true</constant>
            </handled>
        <to id="_to16" uri="direct:stepInsertLogging"/>
    </route>
    <!-- Event Logging Routes -->
    <route id="startMainLogRoute">
        <from id="_from11" uri="FirstStepInsert"/>
        <log id="_log25" message="Received File ${file:name} : ${body}"/>
        <setProperty id="_setProperty92" propertyName="stepName">
            <simple>File Received</simple>
        </setProperty>
        <log id="_log26" message="${property.flowId}"/>
        <recipientList id="_recipientList7" streaming="false">
            <simple>direct:flowInsertLogging, direct:stepInsertLogging</simple>
        </recipientList>
    </route>
    <route id="flowInsertLogRoute">
        <from id="_from12" uri="direct:flowInsertLogging"/>
        <log id="_log27" message="[flowInsertLogRoute]  Flow Id is ${property.flowId}"/>
        <process id="_process8" ref="flowProcessor"/>
        <transform id="_transform1">
            <method method="getFlowMap" ref="flowMapper"/>
        </transform>
        <log id="_log28" message="Executing the query {{sql.insertFlowDetail}}"/>
        <to id="_to17" uri="sql:{{sql.insertFlowDetail}}"/>
    </route>
    <route id="stepInsertLogRoute">
        <from id="_from13" uri="direct:stepInsertLogging"/>
        <log id="_log29" message="[stepInsertLogRoute] Flow Id is ${property.flowId}"/>
        <process id="_process9" ref="stepProcessor"/>
        <transform id="_transform2">
            <method method="getStepMap" ref="stepMapper"/>
        </transform>
        <log id="_log30" message="Executing the query {{sql.insertStepDetail}}"/>
        <to id="_to18" uri="sql:{{sql.insertStepDetail}}"/>
    </route>
    <route id="flowUpdateLogRoute">
        <from id="_from14" uri="direct:flowUpdateLogging"/>
        <log id="_log31" message="[flowUpdateLogRoute] Flow Id is ${property.flowId}"/>
        <transform id="_transform3">
            <method method="getFlowUpdateMap" ref="flowUpdateMapper"/>
        </transform>
        <log id="_log32" message="Executing the query {{sql.updateFlowDetail}}"/>
        <to id="_to19" uri="sql:{{sql.updateFlowDetail}}"/>
    </route>
</camelContext>

在这里,我正在更新我的数据库中未遵循执行顺序的“ stepName ”属性。

它应该是:

  1. 收到文件
  2. 数据源文件记录
  3. 文件验证
  4. 数据转换
  5. 数据目标文件记录
  6. 文件已交付

但我在数据库条目中看不到这个顺序。

4

1 回答 1

1

Themis 和 NoMad17 已经在评论中说。从文档

收件人将收到同一 Exchange 的副本,Camel 将按顺序执行它们。

我在您的路线中看到您正在链接其他收件人列表。也许你错过了一些东西,因为它有很多路线。为了验证收件人列表中的顺序,以这个单元测试为例:

public class RecipientListSequenceAggregateRouteTest extends CamelTestSupport {

    EmbeddedDatabase db;

    @Before
    public void setUp() throws Exception {
        db = new EmbeddedDatabaseBuilder()
            .setType(EmbeddedDatabaseType.DERBY).addScript("sql/test.sql").build();

        super.setUp();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);

                from("direct:start")
                    .recipientList(constant("direct:a1, direct:a2, direct:a3, direct:select"));

                from("direct:a1")
                    .log("got message in a1: waiting 3s")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            Thread.sleep(3000);
                        }
                    })
                    .setBody(constant("a1"))
                    .recipientList(constant("direct:db, direct:flow"));

                from("direct:a2")
                    .log("got message in a2: waiting 5s")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            Thread.sleep(5000);
                        }                   
                    })
                    .setBody(constant("a2"))
                    .recipientList(constant("direct:db, direct:flow"));

                from("direct:a3")
                    .log("got message in a3: waiting 1s")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            Thread.sleep(1000);
                        }
                    });

                from("direct:db")
                    .log("got message in db from ${body}")
                    .setBody(simple("db_${in.body}"))
                    .to("sql:insert into log (body_in) values (:#${in.body})");
                    ;

                from("direct:flow")
                    .log("got message in flow from ${body}")
                    .setBody(simple("flow_${in.body}"))
                    .to("sql:insert into log (body_in) values (:#${in.body})");
                    ;

                from("direct:select")
                    .to("sql:select * from log order by time_in")
                    .log("results:\n ${body}")
                    .to("mock:result");
            }
        };
    }

    @Test
    public void test() throws InterruptedException {
        getMockEndpoint("mock:result").expectedMessageCount(1);
        Object results = template.requestBody("direct:start", "");
        assertNotNull(results);
        assertMockEndpointsSatisfied();
    }
}

结果是:

19:23:47.090 [main] INFO route2 - got message in a1: waiting 3s
19:23:50.093 [main] INFO route5 - got message in db from a1
19:23:50.315 [main] INFO route6 - got message in flow from a1
19:23:50.337 [main] INFO route3 - got message in a2: waiting 5s
19:23:55.343 [main] INFO route5 - got message in db from a2
19:23:55.351 [main] INFO route6 - got message in flow from a2
19:23:55.359 [main] INFO route4 - got message in a3: waiting 1s
19:23:56.428 [main] INFO route7 - results:
[{BODY_IN=db_a1, TIME_IN=2017-12-22 19:23:50.301}, {BODY_IN=flow_a1, TIME_IN=2017-12-22 19:23:50.334}, {BODY_IN=db_a2, TIME_IN=2017-12-22 19:23:55.348}, {BODY_IN=flow_a2, TIME_IN=2017-12-22 19:23:55.356}]

如您所见,即使链接其他收件人列表,邮件也是按顺序传递的。你能从你的路线中取出一些样本并查看日志吗?尝试逐步分析。看到你的 DSL,我看不出有什么问题。

如果您需要处理来自收件人列表的回复,您可能需要一个自定义Aggregator

一个 AggregationStrategy 将来自收件人的回复组装成来自收件人列表的单个传出消息。默认情况下,Camel 将使用最后的回复作为传出消息。从 Camel 2.12 开始,您还可以使用 POJO 作为 AggregationStrategy,有关更多详细信息,请参阅 Aggregator 页面。如果从 AggregationStrategy 中的聚合方法引发异常,则默认情况下,错误处理程序不会处理该异常。如果启用 shareUnitOfWork 选项,则可以启用错误处理程序做出反应。

于 2017-12-22T21:37:36.060 回答