1

我正在研究 Apache WSO2 CEP,我正在尝试做一些符合我要求的场景。

首先,我执行此链接KPI Analyzer中解释的示例,我成功并正确查看了结果。在那之后,我想从这里稍微改变一下那个样本中的桶

from phoneRetailStream[totalPrice>2500 and quantity>3]
insert into highPurchaseStream
buyer, brand, quantity, totalPrice; 

对此

from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by  brand;

并相应地更改了元组映射。但是这个配置总是给我错误

 [java] Wrongly formatted event sent for carbon.super
 [java] org.wso2.carbon.databridge.core.exception.EventConversionException: Error when converting org.wso2.high.purchase.buyers.new:1.6.0 of event bundle with events 4
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:126)
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toEventList(ThriftEventConverter.java:88)
 [java]     at org.wso2.carbon.databridge.core.internal.queue.QueueWorker.run(QueueWorker.java:72)
 [java]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
 [java]     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 [java]     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 [java]     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 [java]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 [java]     at java.lang.Thread.run(Thread.java:662)
 [java] Caused by: java.lang.NullPointerException
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toObjectArray(ThriftEventConverter.java:49)
 [java]     at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:116)
 [java]     ... 8 more

除了 jmsbroker 之外,我无法在这些示例中使用 siddhi 语言的聚合函数(sum、avg 等)。这种情况可能有什么问题?

@Mohanadarshan

这是我的存储桶 xml 文件的最后一个版本

<bucket name="KPIAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
Notifies when a user purchases more then 3 phones for the total price higher than $2500.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="org.wso2.phone.retail.store/1.2.0" brokerName="localAgentBroker">
        <tupleMapping stream="phoneRetailStream" queryEventType="Tuple">
            <property name="brand" inputName="brand" inputDataType="payloadData"
                      type="java.lang.String"/>
            <property name="quantity" inputName="quantity" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="totalPrice" inputName="total" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="buyer" inputName="buyer" inputDataType="payloadData"
                      type="java.lang.String"/>
        </tupleMapping>
    </input>
    <query name="KPIQuery">
        <expression>
from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by  brand;
        </expression>
        <output topic="org.wso2.high.purchase.buyers.new/1.6.0" brokerName="externalAgentBroker">
            <tupleMapping>
                <metaData>
                </metaData>
                <correlationData/>
                <payloadData>
                    <property name="quantity" valueOf="quantitySum" type="java.lang.Integer"/>
                    <property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Integer"/>
                </payloadData>
            </tupleMapping>
        </output>
    </query>
</bucket>

感谢您的帮助。

4

1 回答 1

2

我检查了您的 siddhi 查询,它与 siddhi 引擎一起正常工作,并且查询没有任何问题...

我已经检查了 KPI 分析器示例,它对于您的查询也没有任何问题...请确保您是否有正确的输出元组映射配置..

我需要再次提及,“存储桶可编辑 UI”存在一些问题。请验证/repository/deployment/server/cepbuckets/中的bucket xml文件是否正确...(有时删除属性时,它不会正确删除)

改变

<property name="quantity" valueOf="quantitySum" type="java.lang.Long"/>
<property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Long"/>

谢谢,

莫汉

于 2013-04-17T11:29:23.933 回答