1

我正在使用嵌入在 WSO2 DAS 中的 WSO2 CEP 用于以下执行计划

@Import('RelatedStream:1.0.0')
define stream rs (ID string, product string, uc1 string, state string, brand string, model string, type string, tweet string, rtID string);

@Import('InputStream:1.0.0')
define stream ins (ID string, product string, uc1 string, state string, brand string, model string, type string, tweet string);



@Export('matchingStream:1.0.0')
define stream ms (rID string, rproduct string, ruc1 string, rstate string, rbrand string, rmodel string, rtype string, hID string, hproduct string, huc1 string, hstate string, hbrand string, hmodel string, htype string);

from ins#window.time(2 sec) as R 
  join rs#window.length(1) as H
  on R.product == H.product and R.brand==H.brand and R.type==H.type and R.model==H.model and R.state!=H.state and R.ID==H.rtID
select R.ID as rID, R.product as rproduct , R.uc1 as ruc1 , R.state as rstate, R.brand as rbrand , R.model as rmodel , R.type as rtype ,H.ID as hID , H.product as hproduct , H.uc1 as huc1 , H.state as hstate , H.brand as hbrand, H.model as hmodel , H.type as htype
insert all events into ms;

每个输出事件产生两次相同的值

事件消息跟踪器日志如下

流一(InputStream)

08:23:21,845 [-] [DataBridge-Core-pool-1-thread-3]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : InputStream:1.0.0 (ins), before processing _Event{timestamp=1464144801440, data=[27, phone, g1, sell, samsung, galaxy note, type, Use UM10 to get 10% OFF #unlockyourphone Galaxy Note 6 will reportedly be the first Samsung phone to feature US... sell], isExpired=false} (Sanitized)

流二(RelatedStream)

08:23:21,974 [-] [DataBridge-Core-pool-1-thread-2]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801943, data=[5, phone, g1, sell, samsung, iphone 4s, type, White 3 USB Port Car Charger Adapter For iPhone 4S 5S 5C 6 6S iPad Samsung Phone - Bid Now? sell, 27], isExpired=false} (Sanitized)
08:23:21,998 [-] [DataBridge-Core-pool-1-thread-4]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[11, phone, g1, buy, samsung, galaxy s4, type, #cellular #deals  Samsung Galaxy S4 SCH-I545 16GB Verizon AT&T GSM UNLOCKED Cell Phone RF buy, 27], isExpired=false} (Sanitized)
08:23:22,030 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[13, phone, g1, sell, samsung, galaxy note, type, UNLOCKED T-Mobile Samsung Galaxy Note 3 SM-N900T 4G LTE GSM 32GB Smart Phone  sell, 27], isExpired=false} (Sanitized)
08:23:22,031 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[18, phone, g1, sell, samsung, galaxy s6, type, Cell Phones : New Samsung Galaxy S6 Edge SM-G925F 5.1'' 16MP (FACTORY UNLOCKED) 32GB Phone  sell, 27], isExpired=false} (Sanitized)
08:23:22,031 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[19, phone, g1, sell, samsung, galaxy s4, type, #cellular #deals  Samsung i545 Galaxy S4 16GB Verizon 13MP Camera WiFi Cell Phone sell, 27], isExpired=false} (Sanitized)
08:23:22,031 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[1, phone, g1, sell, samsung, galaxy s6, type, Cell Phone USA : New Samsung Galaxy S6 Edge SM-G925F 5.1'' 16MP (FACTORY UNLOCKED) 32GB Ph?  sell, 27], isExpired=false} (Sanitized)
08:23:22,033 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[14, phone, g1, sell, samsung, galaxy note, type, Galaxy Note 6 will reportedly be the first Samsung phone to feature USB-C -  sell, 27], isExpired=false} (Sanitized)
08:23:22,034 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801945, data=[16, phone, g1, buy, samsung, galaxy note, type, #unlocksquare Galaxy Note 6 will reportedly be the first Samsung phone to feature USB-C  buy, 27], isExpired=false} (Sanitized)
08:23:22,035 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801945, data=[17, phone, g1, buy, samsung, galaxy s5, type, Cell Phone USA : NEW T-Mobile Protective Cover/Holster For Samsung Galaxy S5 Case Kickstan?  buy, 27], isExpired=false} (Sanitized)
08:23:22,035 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801945, data=[15, phone, g1, buy, samsung, galaxy s6, type, Cell Phone USA : Samsung Galaxy S6 SM-G920F 32GB Unlocked 16MP Smartphone  #4422  buy, 27], isExpired=false} (Sanitized)

输出流(matchingStream)

08:23:22,041 [-] [Siddhi-R_H_Match-executor-thread-1]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : matchingStream:1.0.0 (ms), after processing _[Event{timestamp=1464144801945, data=[27, phone, g1, sell, samsung, galaxy note, type, 16, phone, g1, buy, samsung, galaxy note, type], isExpired=false}] (Sanitized)
08:23:22,120 [-] [Siddhi-R_H_Match-executor-thread-1]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : matchingStream:1.0.0 (ms), after processing _[Event{timestamp=1464144802037, data=[27, phone, g1, sell, samsung, galaxy note, type, 16, phone, g1, buy, samsung, galaxy note, type], isExpired=false}] (Sanitized)
4

1 回答 1

1

当您将所有事件插入到输出事件流中时,它包括当前事件(传入事件)和过期事件(在超时或超出窗口长度时由窗口发出)。因此,有可能获得重复。

如果您的要求是使用“rs”流的传入事件触发输出,您可以尝试以下操作:(使用当前事件)

from ins#window.time(2 sec) as R 
  join rs#window.length(0) as H
  on R.product == H.product and R.brand==H.brand and R.type==H.type and R.model==H.model and R.state!=H.state and R.ID==H.rtID
select R.ID as rID, R.product as rproduct , R.uc1 as ruc1 , R.state as rstate, R.brand as rbrand , R.model as rmodel , R.type as rtype ,H.ID as hID , H.product as hproduct , H.uc1 as huc1 , H.state as hstate , H.brand as hbrand, H.model as hmodel , H.type as htype
insert current events into ms;
于 2016-05-25T05:40:08.483 回答