0

我正在使用 wso2cep 3.0.0 和 activemq5.8.0 根据 CEP 文档,我希望使用 CEP 发布事件。为此,我使用 2 定义QUEUES来启动 activemq,其中传入消息的名称为jmsProxy ,输出消息的名称为JmsProxy 。我在 CEP lib activemq-broker-5.8.0.jar、activemq-client-5.8.0.jar、axiom 中添加了所需的 jars .jar、geronimo-j2ee-management_1.1_spec-1.0.1.jar、geronimo-jms_1.1_spec-1.1.1.jar、hawtbuf-1.2.jar、xpp3-1.1.4c.jar、xstream-1.4.4.jar

我的配置是这样的 InputEventAdaptor

<?xml version="1.0" encoding="UTF-8"?>
<inputEventAdaptor name="jmsProxy" statistics="disable" trace="enable"
  type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
  <property name="java.naming.provider.url">tcp://localhost:61616</property>
  <property name="transport.jms.SubscriptionDurable">true</property>
  <property name="transport.jms.DurableSubscriberName">jmsProxy</property>
  <property name="transport.jms.UserName">admin</property>
  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
  <property name="transport.jms.Password">admin</property>
  <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  <property name="transport.jms.DestinationType">queue</property>
</inputEventAdaptor>

上面的传入消息将从jmsProxy队列中选择消息但它无法从 jmsProxy 队列中选择消息。我将如何启动它以将消息放入CEPoutputEventAdaptor

<?xml version="1.0" encoding="UTF-8"?>
<outputEventAdaptor name="JmsProxy" statistics="disable" trace="disable"
  type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
  <property name="java.naming.security.principal">admin</property>
  <property name="java.naming.provider.url">tcp://localhost:61616</property>
  <property name="java.naming.security.credentials">admin</property>
  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
  <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  <property name="transport.jms.DestinationType">queue</property>
</outputEventAdaptor>

像这样的事件生成器配置

<?xml version="1.0" encoding="UTF-8"?>
<eventBuilder name="ReadingsDtoBuilder" statistics="disable"
    trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
<from eventAdaptorName="jmsProxy" eventAdaptorType="jmsProxy">
    <property name="transport.jms.Destination">JmsProxy</property>
</from>
<mapping customMapping="disable"
    parentXpath="//ReadingsLiteTaildtos" type="xml">
    <property>
        <from xpath="//ReadingsLiteTaildto/ParameterId"/>
        <to name="meta_parameterId" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/Slno"/>
        <to name="meta_slno" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/FinalValue"/>
        <to name="finalValue" type="int"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/InputText"/>
        <to name="inputText" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/InputValue"/>
        <to name="inputValue" type="double"/>
    </property>
</mapping>
<to streamName="org.sample.readings.dto.stream" version="1.0.0"/>
</eventBuilder>

执行计划可以如下。like this

<?xml version="1.0" encoding="UTF-8"?>
<executionPlan name="ReadingsAnalyzer" statistics="disable"
  trace="disable" xmlns="http://wso2.org/carbon/eventprocessor">
  <description>This execution plan analyzes readings and triggers notifications based on     threshold.</description>
  <siddhiConfiguration>
    <property name="siddhi.enable.distributed.processing">false</property>
    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
  </siddhiConfiguration>
  <importedStreams>
    <stream as="readings" name="org.sample.readings.dto.stream" version="1.0.0"/>
  </importedStreams>
  <queryExpressions><![CDATA[from readings[finalValue > 100]
select *
insert into notificationStream;]]></queryExpressions>
  <exportedStreams>
    <stream name="notificationStream" valueOf="notificationStream" version="1.0.0"/>
  </exportedStreams>
</executionPlan>

我在 stream-manager-config.xml 中定义了流,类似于以下内容。

<streamDefinition name="org.sample.readings.dto.stream" version="1.0.0">
<metaData>
        <property name="parameterId" type="STRING"/>
        <property name="slno" type="STRING"/>
</metaData>
    <payloadData>
        <property name="finalValue" type="INT"/>
        <property name="inputText" type="STRING"/>
        <property name="inputValue" type="DOUBLE"/>
    </payloadData>
</streamDefinition>
<streamDefinition name="notificationStream" version="1.0.0">
<metaData>
        <property name="parameterId" type="STRING"/>
        <property name="slno" type="STRING"/>
</metaData>
    <payloadData>
        <property name="finalValue" type="INT"/>
        <property name="inputText" type="STRING"/>
        <property name="inputValue" type="DOUBLE"/>
    </payloadData>
</streamDefinition>.

当我向我的 jmsProxy 队列发送任何消息时,它看起来一切正常,消息没有反映给 CEP 事件,我在CEP中收到此消息

[2014-02-18 11:57:53,159]  INFO - {EventBuilderDeployer}  Event Builder undeployed successfully : ReadingsDtoBuilder.xml
[2014-02-18 11:57:53,160]  INFO - {EventBuilderDeployer}  Event builder deployment held back and in inactive state :ReadingsDtoBuilder, Waiting for Input Event Adaptor dependency :jmsProxy
[2014-02-18 12:03:58,006]  INFO - {InputEventAdaptorConfigurationFilesystemInvoker}  Input Event Adaptor configuration deleted from file system : jmsProxy.xml
[2014-02-18 12:03:58,006]  INFO - {InputEventAdaptorDeployer}  Input Event Adaptor undeployed successfully : jmsProxy.xml
[2014-02-18 12:03:58,008]  INFO - {InputEventAdaptorConfigurationFilesystemInvoker}  Input Event Adaptor configuration saved in th filesystem : jmsProxy
[2014-02-18 12:03:58,009]  INFO - {InputEventAdaptorDeployer}  Input Event Adaptor deployed successfully and in active state : jmsProxy
[2014-02-18 12:03:58,009]  INFO - {EventBuilderDeployer}  Event Builder undeployed successfully : ReadingsDtoBuilder.xml
[2014-02-18 12:03:58,009]  INFO - {EventBuilderDeployer}  Event builder deployment held back and in inactive state :ReadingsDtoBuilder, Waiting for Input Event Adaptor dependency :jmsProxy
4

1 回答 1

0

查看您的配置,似乎事件生成器配置不正确。事件生成器配置中的事件适配器类型应为“jms”。

<from eventAdaptorName="jmsProxy" eventAdaptorType="jms">

请按上述方式更正此问题以使其正常工作。您可以启用跟踪 [1] 以查看消息是否到达 CEP 的特定组件。

[1] https://docs.wso2.org/display/CEP300/CEP+Event+Tracer

于 2014-03-04T09:57:17.817 回答