0

WSO2 CEP 自定义函数不能用作聚合函数,而是对每一行数据进行操作。

假设我们在输入流中有 value1 和 value2。

  • 主机:A,值 1:1,值 2:10
  • 主机:A,值 1:2,值 2:20
  • 主机:A,值 1:3,值 2:30
  • 主机:A,值 1:4,值 2:40
  • 主机:A,值 1:5,值 2:50

现在我已经按主机完成了时间批处理和分组。并编写了一个自定义函数来查找 MEDIAN,并且还使用了内置函数 SUM。

但它所做的是,它在分组后对所有 5 行应用 SUM 函数,但自定义函数是逐行调用的。自定义函数类中的 process 方法被逐行调用。因此 SUM 返回 15,但我的 MEDIAN 函数给出的输出为 50,这只是最后一个 value2。

执行计划查询

from inputStream#window.timeBatch(10 sec)
select value1 as value1, value2 as value2, sum(value1) as sumOfValue1, custom:median(value2) as medianOfValue2 group by host
insert into outputStream;

自定义函数 Java 类片段

/**
     * Method called when sending events to process
     *
     * @param obj
     * @return
     */
    @Override
    protected Object process(Object obj) {

我们不能有一个自定义聚合函数,或者如果自定义函数只应该逐行执行???

4

1 回答 1

0

您可以通过编写自定义 OutputAttributeAggregator而不是自定义函数来解决此问题。OutputAttributeAggregator 应该用于此类场景,函数用于单独的行。

于 2014-10-09T12:37:34.860 回答