0

我有以下用例:我使用 Camel 作为 Web 服务代理,我从 A 获取请求,丰富 SOAP 标头并转发给 B。B 的响应返回给 A。(我用 A & B 替换了实际产品名称)

我的路线如下所示:

    <route id="orderLimitRoute">
        <from uri="cxf:bean:aOrderLimit?dataFormat=CXF_MESSAGE" />
        <to uri="bean:soapHeadersEnricher"/>
        <to uri="cxf:bean:bOrderLimit?dataFormat=CXF_MESSAGE" />
    </route>

像这样的路线有效,这不是问题。

丰富器从某处检索访问密钥,并放入 SOAP 标头。B 只接受具有有效访问密钥的请求。检索访问密钥是一项昂贵的操作,因此我们为此使用了一个池,B 也没有真正定义访问密钥何时变得无效。所以我们有这个(在非骆驼场景中):

public Object execute(Call call) throws AbstractBWebServiceException {
    Object result = null;
    AccessKey accessKey = null;
    int tries = maxTries > 0 ? maxTries : accessKeyPool.getMaxActive() + 1;

    for (int i = 0; i < tries && accessKey == null; i++) {
        try {
            accessKey = borrowAccessKey();
            result = call.execute(accessKey);
        } catch (BWebServiceLoginException loginException) {
            invalidate(accessKey);
            accessKey = null;
        } finally {
            if (accessKey != null) {
                returnAccessKey(accessKey);
            }
        }
    }

    if (accessKey == null) {
        throw new BWebServiceAccessKeyException("Couldn't get a valid accesskey");
    }

    return result;
}

call.execute(accessKey) 在哪里调用 B。现在我想在我的 Camel 路由中使用那段 Java 代码。要么直接,要么翻译成骆驼逻辑。但我不知道如何做到这一点,我一遍又一遍地查看 Camel 文档,但找不到任何相关内容。

4

2 回答 2

1

在 Java 代码中,您可以使用 ProducerTemplate 向任何 Camel 端点发送消息。 http://camel.apache.org/producertemplate.html

于 2013-04-07T07:27:13.420 回答
0

这是我解决它的方法,“soapHeadersEnricher”是一个 bean,它将访问密钥插入到 soapheader 中,并将其放在交换中。当访问密钥被视为无效时,我会收到一个带有特定代码的 SOAPFault。'bLoginExceptionPredicate' 检查该特定代码,并为该类型的异常启用重新传递。'onRedelivery' 从交换器获取无效的访问密钥,将其标记为对池无效,请求一个新的,在交换器和 SOAP 标头中设置它。最后的处理器将访问密钥返回到池中。

from("cxf:bean:brokerOrderLimit?dataFormat=PAYLOAD&allowStreaming=false")
            .process(soapHeadersEnricher)
            .onException(SoapFault.class)
            .onWhen(bLoginExceptionPredicate)
            .handled(false) // return fault to client
            .maximumRedeliveries(2)
            .redeliveryDelay(5000) 
            .retryAttemptedLogLevel(LoggingLevel.WARN)
            .logHandled(false)
            .onRedelivery(soapHeadersEnricher)
            .end()
            .to("cxf:bean:bOrderLimit?dataFormat=PAYLOAD&allowStreaming=false")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    AccessKey accessKey = exchange.getProperty(BSoapHeadersEnricher.ACCESKEY_PROPERTY, AccessKey.class);
                    accessKeyProvider.returnAccessKey(accessKey);
                }
            });
于 2013-04-09T09:26:57.987 回答