我正在研究 Apache WSO2 CEP,我正在尝试做一些符合我要求的场景。
首先,我执行此链接KPI Analyzer中解释的示例,我成功并正确查看了结果。在那之后,我想从这里稍微改变一下那个样本中的桶
from phoneRetailStream[totalPrice>2500 and quantity>3]
insert into highPurchaseStream
buyer, brand, quantity, totalPrice;
对此
from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by brand;
并相应地更改了元组映射。但是这个配置总是给我错误
[java] Wrongly formatted event sent for carbon.super
[java] org.wso2.carbon.databridge.core.exception.EventConversionException: Error when converting org.wso2.high.purchase.buyers.new:1.6.0 of event bundle with events 4
[java] at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:126)
[java] at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toEventList(ThriftEventConverter.java:88)
[java] at org.wso2.carbon.databridge.core.internal.queue.QueueWorker.run(QueueWorker.java:72)
[java] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
[java] at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
[java] at java.util.concurrent.FutureTask.run(FutureTask.java:138)
[java] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
[java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
[java] at java.lang.Thread.run(Thread.java:662)
[java] Caused by: java.lang.NullPointerException
[java] at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toObjectArray(ThriftEventConverter.java:49)
[java] at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:116)
[java] ... 8 more
除了 jmsbroker 之外,我无法在这些示例中使用 siddhi 语言的聚合函数(sum、avg 等)。这种情况可能有什么问题?
@Mohanadarshan
这是我的存储桶 xml 文件的最后一个版本
<bucket name="KPIAnalyzer" xmlns="http://wso2.org/carbon/cep">
<description>
Notifies when a user purchases more then 3 phones for the total price higher than $2500.
</description>
<engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
<property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
<property name="siddhi.enable.distributed.processing">false</property>
</engineProviderConfiguration>
<input topic="org.wso2.phone.retail.store/1.2.0" brokerName="localAgentBroker">
<tupleMapping stream="phoneRetailStream" queryEventType="Tuple">
<property name="brand" inputName="brand" inputDataType="payloadData"
type="java.lang.String"/>
<property name="quantity" inputName="quantity" inputDataType="payloadData"
type="java.lang.Integer"/>
<property name="totalPrice" inputName="total" inputDataType="payloadData"
type="java.lang.Integer"/>
<property name="buyer" inputName="buyer" inputDataType="payloadData"
type="java.lang.String"/>
</tupleMapping>
</input>
<query name="KPIQuery">
<expression>
from phoneRetailStream#window.length(5)
insert into highPurchaseStream
sum(quantity) as quantitySum, sum(totalPrice) as totalpriceSum
group by brand;
</expression>
<output topic="org.wso2.high.purchase.buyers.new/1.6.0" brokerName="externalAgentBroker">
<tupleMapping>
<metaData>
</metaData>
<correlationData/>
<payloadData>
<property name="quantity" valueOf="quantitySum" type="java.lang.Integer"/>
<property name="purchasePrice" valueOf="totalpriceSum" type="java.lang.Integer"/>
</payloadData>
</tupleMapping>
</output>
</query>
</bucket>
感谢您的帮助。