我有一个关于编写自定义 OutputAttributeProcessor 的问题。我使用 WSO2 CEP 2.1.0 和 siddhi 1.1.0。
我想创建一个自定义的OutputAttributeProcessor,所以我创建了两个java 类,DiscomfortIndexAggregatorFactory 实现了OutputAttributeProcessorFactory 和DiscomfortIndexAggregator 实现了OutputAttributeProcessor。两个类的包是org.wso2.siddhi.extention.aggregator.environment。
两个java程序如下。
DiscomfortIndexAggregatorFactory.java
package org.wso2.siddhi.extention.aggregator.environment;
import org.wso2.siddhi.core.query.projector.attribute.factory.OutputAttributeProcessorFactory;
import org.wso2.siddhi.core.query.projector.attribute.handler.OutputAttributeProcessor;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
@SiddhiExtension(namespace = "environment", function = "discomfortIndex")
public class DiscomfortIndexAggregatorFactory implements
OutputAttributeProcessorFactory {
@Override
public OutputAttributeProcessor createAggregator(Type type) {
return new DiscomfortIndexAggregator();
}
@Override
public ProcessorType getProcessorType() {
return OutputAttributeProcessorFactory.ProcessorType.AGGREGATOR;
}
}
DiscomfortIndexAggregator.java
package org.wso2.siddhi.extention.aggregator.environment;
import org.wso2.siddhi.core.query.projector.attribute.handler.OutputAttributeProcessor;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
public class DiscomfortIndexAggregator implements OutputAttributeProcessor {
private static final long serialVersionUID = -5992266303998509661L;
@Override
public OutputAttributeProcessor createNewInstance() {
return new DiscomfortIndexAggregator();
}
@Override
public Type getType() {
return Type.DOUBLE;
}
@Override
public Object processInEventAttribute(Object obj) {
double discomfortIndex = -1D;
if (obj instanceof Object[]) {
Object[] objArray = (Object[]) obj;
double temperature = (double) objArray[0];
double humidity = (double) objArray[1];
discomfortIndex = 0.81 * temperature + 0.01 * humidity
* (0.99 * temperature - 14.3) + 46.3;
}
return discomfortIndex;
}
@Override
public Object processRemoveEventAttribute(Object obj) {
double discomfortIndex = -1D;
if (obj instanceof Object[]) {
Object[] objArray = (Object[]) obj;
double temperature = (double) objArray[0];
double humidity = (double) objArray[1];
discomfortIndex = 0.81 * temperature + 0.01 * humidity
* (0.99 * temperature - 14.3) + 46.3;
}
return discomfortIndex;
}
}
我创建的 jar 文件包含两个 java 类,将 jar 文件添加到 /repository/components/lib 的类路径中,并添加了位于 /repository/conf/siddhi 的 siddhi.extension 文件。siddhi.extention 的内容如下。
org.wso2.siddhi.extention.aggregator.environment.DiscomfortIndexAggregatorFactory
我在上述配置后重新启动。重启后不输出错误日志。
但是我创建了以下存储桶查询,
from hiroshimaData
insert into disconfortIndexStream
environment:discomfortIndex(RTC_001_Temp, SHT_001_Humi) as discomfortIndex
并输出以下错误日志。
ERROR {org.wso2.carbon.cep.core.BucketDeployer} - wrong configuration provided for adding HIROSHIMA.xml
org.wso2.carbon.cep.core.exception.CEPConfigurationException: Error in initializing Siddhi backend Runtime,null
at org.wso2.carbon.cep.core.internal.CEPBucket.init(CEPBucket.java:109)
at org.wso2.carbon.cep.core.internal.CEPService.deployBucket(CEPService.java:213)
at org.wso2.carbon.cep.core.internal.CEPService.deployBucket(CEPService.java:174)
at org.wso2.carbon.cep.core.BucketDeployer.deploy(BucketDeployer.java:95)
at org.apache.axis2.deployment.repository.util.DeploymentFileData.deploy(DeploymentFileData.java:136)
at org.apache.axis2.deployment.DeploymentEngine.doDeploy(DeploymentEngine.java:810)
at org.apache.axis2.deployment.repository.util.WSInfoList.update(WSInfoList.java:144)
at org.apache.axis2.deployment.RepositoryListener.update(RepositoryListener.java:377)
at org.apache.axis2.deployment.RepositoryListener.checkServices(RepositoryListener.java:254)
at org.apache.axis2.deployment.RepositoryListener.startListener(RepositoryListener.java:371)
at org.apache.axis2.deployment.scheduler.SchedulerTask.checkRepository(SchedulerTask.java:59)
at org.apache.axis2.deployment.scheduler.SchedulerTask.run(SchedulerTask.java:67)
at org.wso2.carbon.core.deployment.CarbonDeploymentSchedulerTask.runAxisDeployment(CarbonDeploymentSchedulerTask.java:67)
at org.wso2.carbon.core.deployment.CarbonDeploymentSchedulerTask.run(CarbonDeploymentSchedulerTask.java:112)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.NullPointerException
at org.wso2.siddhi.core.query.projector.attribute.processor.AbstractAggregationAttributeProcessor.load(AbstractAggregationAttributeProcessor.java:88)
at org.wso2.siddhi.core.persistence.PersistenceService.restoreRevision(PersistenceService.java:79)
at org.wso2.siddhi.core.persistence.PersistenceService.restoreLastRevision(PersistenceService.java:104)
at org.wso2.siddhi.core.SiddhiManager.restoreLastRevision(SiddhiManager.java:276)
at org.wso2.carbon.cep.siddhi.backend.SiddhiBackEndRuntime.init(SiddhiBackEndRuntime.java:259)
at org.wso2.carbon.cep.core.internal.CEPBucket.init(CEPBucket.java:107)
你能告诉我怎么做吗?
先感谢您。