当使用拆分器和路由单将主体部分路由到不同的端点时,我发现.end()
需要一个以避免包含拆分块之外的任何内容。
所需的行为是拆分主体,使用路由表将每个部分路由到不同的端点。拆分块完成后,继续处理交换(和正文),就像拆分前一样。
测试代码有两条相同的路线,.end()
除了.routingSlip()
. 当测试运行时,您可以看到带有.end()
3 个内部处理器消息和一个外部处理器消息的那个。拆分块完成后,它还将具有正确的有效负载类型。而另一个使用没有 after 的第二条路由的测试将.end()
产生3routingSlip()
个交错的内部和外部处理器消息。
虽然我可能错过了文档中的某些内容,但我找不到任何以这种方式使用拆分器和 routingSlip 的示例,这会警告我我需要.end()
让它按照我的意图运行。如果这不是一个错误,我会建议这个问题的更明显的文档。我可能会更早找到它,但我的原始代码涉及一个自定义拆分器,这并不明显是问题所在,而不是我的代码。
我也不知道这个问题是否也适用于收件人列表或动态路由器。
package org.apache.camel.processor.routingslip;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class SpliterRoutingSlipTest extends CamelTestSupport {
private static final String TEST_DIR = "target/test";
private static final String TEST_OUT_ENDPOINT_WEND = "file:"+TEST_DIR+"/Wend";
private static final String TEST_OUT_ENDPOINT_WOEND = "file:"+TEST_DIR+"/WOend";
private static final String TEST_ROUTE_ID_WEND = "splitBodyTestWEnd";
private static final String TEST_ROUTE_ID_WOEND = "splitBodyTestWOEnd";
private static final String TEST_IN_ENDPOINT_WEND = "direct:"+TEST_ROUTE_ID_WEND;
private static final String TEST_IN_ENDPOINT_WOEND = "direct:"+TEST_ROUTE_ID_WOEND;
private static final String TEST_ROUTING_SLIP_HEADER = "toEndpoint";
private static final List<String> TEST_BODY = Arrays.asList(new String[] {
"This is line 1",
"This is line 2",
"This is line 3",
});
@BeforeClass
public static void init() throws IOException {
File dirToRemove = new File(TEST_DIR);
if (dirToRemove.exists())
FileUtils.forceDelete(dirToRemove);
}
/**
* Test split and routing slip WITH an '.end()' after the routing slip.
*
* The result is that the Inner processor gets called for EACH iteration within the split
* but the Outer process only gets called after the split is complete AND the exchange
* is the one from before being split.
*
* This IS the desired behavior.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(1);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WEND);
assertMockEndpointsSatisfied();
}
/**
* Test split and routing slip WITH OUT an '.end()' after the routing slip.
*
* The result is that the inner and outer processors BOTH get called for EACH iteration within the split.
*
* This is NOT the desired effect.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithIncorrectOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(3);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WOEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WOEND);
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(TEST_IN_ENDPOINT_WEND).id(TEST_ROUTE_ID_WEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor w/ end().");
Message in = exchange.getIn();
System.out.println("\tin="+in);
Object body = in.getBody();
System.out.println("\tbody="+body);
System.out.println("\tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
.end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor w/ end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
from(TEST_IN_ENDPOINT_WOEND).id(TEST_ROUTE_ID_WOEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor W/O end().");
Message in = exchange.getIn();
System.out.println("\tin="+in);
Object body = in.getBody();
System.out.println("\tbody="+body);
System.out.println("\tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WOEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
// .end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor W/O end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
}
};
}
}