12

我是WSO2 ESB中的新手,我已经实现了具有此特定行为的自定义消息处理器:在从消息存储中检索元素之后和执行与此消息处理器相关的序列之前执行操作

我试着详细解释一下。

这是我的 ESB 消息处理器定义:

<?xml version="1.0" encoding="UTF-8"?>
<!---<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">-->
<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="sequence">transferProcessorSequence</parameter>
    <parameter name="interval">1000</parameter>
    <parameter name="is.active">true</parameter>
    <parameter name="concurrency">1</parameter>
</messageProcessor>

它从transferFromMessageStore(一个队列)中检索一些元素(XML 文档)并将这个对象传递给使用它的transferProcessorSequence.xml序列。正如您在此时看到的,我已经实现了一个自定义消息处理器SamplingProcessorHeaderRateLimit,它只是扩展了org.apache.synapse.message.processor.impl.sampler.SamplingProcessor WSO2 类。此时它仅在执行init()方法时显示日志。我将它部署在我的 Carbon 服务器上,它可以工作。

在这里您可以找到整个项目代码。

好的,但据我了解,为了获得所需的行为,我不必简单地扩展SamplingProcessor类,因为为了在每个消息消费和发送到序列之间进行自定义实现,需要扩展SamplingService类,这个

我认为我需要覆盖execute()fetch(MessageConsumer msgConsumer)

此时应该可以插入一个日志,每次从消息存储中检索一个元素并且在执行与消息处理器相关的序列之前写入日志文件的内容。

可能吗?

所以我的主要主要问题是:

1)我是否必须创建一个类,将SamplingService类扩展到我正在实现自定义消息处理器的同一个项目中(此行为只能用于我的 WSO2 ESB 项目中的这个特定消息处理器,所有其他消息处理器使用在这个项目中必须使用标准的 SamplingService 实现)。

2)另一个疑问与此自定义SamplingService实现如何传递给我的自定义消息处理器有关。进入 SamplingProcessor WSO2 类(如何将特定的自定义消息处理器实现与处理其生命周期的自定义 SamplingService 实现相关联)。

4

3 回答 3

1
1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).

您的自定义SamplingProcessorHeaderRateLimitation只会使用传入的消息,transferFromMessageStore并且只会将其使用和处理的消息注入到 sequence 中transferProcessorSequence。此消息处理器不会处理所有其他路径。

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).

如果您查看您实现的源代码SamplingProcessorHeaderRateLimitation.getTask()您已将您的自定义SamplingService2与您的自定义绑定SamplingProcessorHeaderRateLimitation

@Override
protected Task getTask() {
    logger.info("getTask() START");
    System.out.println("getTask() START");
    logger.info("getTask() END");
    System.out.println("getTask() END");
    return (Task) new SamplingService2(this, synapseEnvironment, CONCURRENCY, SEQUENCE, isProcessorStartAsDeactivated());
}
于 2017-07-28T11:28:51.333 回答
0

您是否在 XML 中配置了 Task ?

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask"> 
  <trigger interval="5000"/>
</task>

或者

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask">
  <trigger interval="5000" count="10"/>
</task>
于 2017-07-28T23:08:05.770 回答
0

Q1:

我是否要创建一个类,将 SamplingService 类扩展到我正在实现我的自定义消息处理器的同一个项目中(此行为必须仅用于我的 WSO2 ESB 项目中的此特定消息处理器,在此使用的所有其他消息处理器项目必须使用标准的 SamplingService 实现)。

<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="sequence">transferProcessorSequence</parameter>

自定义处理器和服务仅在指定为处理器类时才会使用,如上面的示例所示。

Q2:

另一个疑问与此自定义 SamplingService 实现如何传递给我的自定义消息处理器有关。进入 SamplingProcessor WSO2 类(如何将特定的自定义消息处理器实现与处理其生命周期的自定义 SamplingService 实现相关联)。

您可以直接扩展/实现 ScheduledMessageProcessor 而不是 SamplingProcessor,因为它只实现 getTask() 并在 init() 方法中初始化视图对象。此外,您的SamplingService2类应该扩展Task、 ManagedLifecycle 接口。

于 2017-08-05T04:47:38.573 回答