1

我有两个事件流:源和目标。

资源:

{
"name":"src.testevent",
"version": "0.0.1",
"description": "source test event stream",
"metaData":[{"name":"id","type":"INT"}], 
"correlationData":[],   
"payloadData":[]
}

目的地:

{
"name":"sink.testevent",
"version": "0.0.1",
"description": "sink test event stream",
"metaData":[{"name":"id","type":"int"}], 
"correlationData":[],   
"payloadData":[{"name":"Severity","type":"INT"}]
}

执行计划如下:

<?xml version="1.0" encoding="UTF-8"?>
<executionPlan name="testexecplan"
 statistics="disable" trace="enable"
 xmlns="http://wso2.org/carbon/eventprocessor">

  <description>Execution plan for testing</description>
  <siddhiConfiguration>
    <property name="siddhi.enable.distributed.processing">false</property>
    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
  </siddhiConfiguration>
  <importedStreams>
    <stream as="srcstream" name="src.testevent" version="0.0.1"/>
  </importedStreams>
  <queryExpressions><![CDATA[

// Create temporary stream
define stream tmpstream (id int, Severity int);

// Create some events at tmpstream
from srcstream as a
select a.meta_id as id, 0 as Severity
insert into tmpstream;

from srcstream as a
select a.meta_id as id, 1 as Severity
insert into tmpstream;

from srcstream as a
select a.meta_id as id, 2 as Severity
insert into tmpstream;

// Move last event from temporary stream to sink stream
from tmpstream#window.Length(1) as a
select a.id as meta_id, a.Severity
insert into sinkstream;

]]></queryExpressions>
  <exportedStreams>
    <stream name="sink.testevent" valueOf="sinkstream" version="0.0.1"/>
  </exportedStreams>
</executionPlan>

我需要将最后一个事件从 tmpstream 传递给 sinkstream。我想我需要使用长度(1)的窗口。当我尝试事件模拟器(属性为 112233)时,我收到了接收器事件流中包含三个事件的回复:

12:22:29,290 [-] [http-nio-9443-exec-29]  INFO TenantId=-1234 : Event Processor : testexecplan,src.testevent:0.0.1 (srcstream), before processing 
[112233]
12:22:29,297 [-] [http-nio-9443-exec-29]  INFO TenantId=-1234 : Event Processor : testexecplan,sink.testevent:0.0.1 (sinkstream), after processing 
[Event{streamId='sinkstream', timeStamp=1399537349292, data=[112233, 2], type=new}]
12:22:29,297 [-] [http-nio-9443-exec-29]  INFO TenantId=-1234 : Event Processor : testexecplan,sink.testevent:0.0.1 (sinkstream), after processing 
[Event{streamId='sinkstream', timeStamp=1399537349292, data=[112233, 1], type=new}]
12:22:29,297 [-] [http-nio-9443-exec-29]  INFO TenantId=-1234 : Event Processor : testexecplan,sink.testevent:0.0.1 (sinkstream), after processing 
[Event{streamId='sinkstream', timeStamp=1399537349292, data=[112233, 0], type=new}]

即处理后的三个事件。

我究竟做错了什么?

预先感谢您的回复。

4

1 回答 1

1

长度窗口将简单地保留指定的最新事件数量并过期旧事件。在您的情况下,由于窗口大小为 1,它将保留最后一个事件并使其他事件过期。

我认为您正在寻找的是输出速率限制 [1],它可以根据时间或长度输出窗口的最后一个事件。类似下面的查询而不是您的最后一个查询可能对您有用。

from tmpstream 
select id, Severity 
output last every 3 events
insert into sinkstream;
于 2014-05-10T14:28:32.410 回答