0

我有一个关于编写自定义 OutputAttributeProcessor 的问题。我使用 WSO2 CEP 2.1.0 和 siddhi 1.1.0。

我想创建一个自定义的OutputAttributeProcessor,所以我创建了两个java 类,DiscomfortIndexAggregatorFactory 实现了OutputAttributeProcessorFactory 和DiscomfortIndexAggregator 实现了OutputAttributeProcessor。两个类的包是org.wso2.siddhi.extention.aggregator.environment。

两个java程序如下。

DiscomfortIndexAggregatorFactory.java

package org.wso2.siddhi.extention.aggregator.environment;

import org.wso2.siddhi.core.query.projector.attribute.factory.OutputAttributeProcessorFactory;
import org.wso2.siddhi.core.query.projector.attribute.handler.OutputAttributeProcessor;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

@SiddhiExtension(namespace = "environment", function = "discomfortIndex")
public class DiscomfortIndexAggregatorFactory implements
        OutputAttributeProcessorFactory {

    @Override
    public OutputAttributeProcessor createAggregator(Type type) {
        return new DiscomfortIndexAggregator();
    }

    @Override
    public ProcessorType getProcessorType() {
        return OutputAttributeProcessorFactory.ProcessorType.AGGREGATOR;
    }
}

DiscomfortIndexAggregator.java

package org.wso2.siddhi.extention.aggregator.environment;

import org.wso2.siddhi.core.query.projector.attribute.handler.OutputAttributeProcessor;
import org.wso2.siddhi.query.api.definition.Attribute.Type;

public class DiscomfortIndexAggregator implements OutputAttributeProcessor {

    private static final long serialVersionUID = -5992266303998509661L;

    @Override
    public OutputAttributeProcessor createNewInstance() {
        return new DiscomfortIndexAggregator();
    }

    @Override
    public Type getType() {
        return Type.DOUBLE;
    }

    @Override
    public Object processInEventAttribute(Object obj) {
        double discomfortIndex = -1D;
        if (obj instanceof Object[]) {
            Object[] objArray = (Object[]) obj;
            double temperature = (double) objArray[0];
            double humidity = (double) objArray[1];
            discomfortIndex = 0.81 * temperature + 0.01 * humidity
                    * (0.99 * temperature - 14.3) + 46.3;
        }
        return discomfortIndex;
    }

    @Override
    public Object processRemoveEventAttribute(Object obj) {
        double discomfortIndex = -1D;
        if (obj instanceof Object[]) {
            Object[] objArray = (Object[]) obj;
            double temperature = (double) objArray[0];
            double humidity = (double) objArray[1];
            discomfortIndex = 0.81 * temperature + 0.01 * humidity
                    * (0.99 * temperature - 14.3) + 46.3;
        }
        return discomfortIndex;
    }
}

我创建的 jar 文件包含两个 java 类,将 jar 文件添加到 /repository/components/lib 的类路径中,并添加了位于 /repository/conf/siddhi 的 siddhi.extension 文件。siddhi.extention 的内容如下。

org.wso2.siddhi.extention.aggregator.environment.DiscomfortIndexAggregatorFactory

我在上述配置后重新启动。重启后不输出错误日志。

但是我创建了以下存储桶查询,

from hiroshimaData
insert into disconfortIndexStream
environment:discomfortIndex(RTC_001_Temp, SHT_001_Humi) as discomfortIndex

并输出以下错误日志。

ERROR {org.wso2.carbon.cep.core.BucketDeployer} -  wrong configuration provided for adding HIROSHIMA.xml
org.wso2.carbon.cep.core.exception.CEPConfigurationException: Error in initializing Siddhi backend Runtime,null
        at org.wso2.carbon.cep.core.internal.CEPBucket.init(CEPBucket.java:109)
        at org.wso2.carbon.cep.core.internal.CEPService.deployBucket(CEPService.java:213)
        at org.wso2.carbon.cep.core.internal.CEPService.deployBucket(CEPService.java:174)
        at org.wso2.carbon.cep.core.BucketDeployer.deploy(BucketDeployer.java:95)
        at org.apache.axis2.deployment.repository.util.DeploymentFileData.deploy(DeploymentFileData.java:136)
        at org.apache.axis2.deployment.DeploymentEngine.doDeploy(DeploymentEngine.java:810)
        at org.apache.axis2.deployment.repository.util.WSInfoList.update(WSInfoList.java:144)
        at org.apache.axis2.deployment.RepositoryListener.update(RepositoryListener.java:377)
        at org.apache.axis2.deployment.RepositoryListener.checkServices(RepositoryListener.java:254)
        at org.apache.axis2.deployment.RepositoryListener.startListener(RepositoryListener.java:371)
        at org.apache.axis2.deployment.scheduler.SchedulerTask.checkRepository(SchedulerTask.java:59)
        at org.apache.axis2.deployment.scheduler.SchedulerTask.run(SchedulerTask.java:67)
        at org.wso2.carbon.core.deployment.CarbonDeploymentSchedulerTask.runAxisDeployment(CarbonDeploymentSchedulerTask.java:67)
        at org.wso2.carbon.core.deployment.CarbonDeploymentSchedulerTask.run(CarbonDeploymentSchedulerTask.java:112)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.NullPointerException
        at org.wso2.siddhi.core.query.projector.attribute.processor.AbstractAggregationAttributeProcessor.load(AbstractAggregationAttributeProcessor.java:88)
        at org.wso2.siddhi.core.persistence.PersistenceService.restoreRevision(PersistenceService.java:79)
        at org.wso2.siddhi.core.persistence.PersistenceService.restoreLastRevision(PersistenceService.java:104)
        at org.wso2.siddhi.core.SiddhiManager.restoreLastRevision(SiddhiManager.java:276)
        at org.wso2.carbon.cep.siddhi.backend.SiddhiBackEndRuntime.init(SiddhiBackEndRuntime.java:259)
        at org.wso2.carbon.cep.core.internal.CEPBucket.init(CEPBucket.java:107)

你能告诉我怎么做吗?

先感谢您。

4

0 回答 0