0

output every将子句与表子句一起使用时,我遇到了意外的行为join

  • 我有一个基本的应用程序,有一个input流和 2 个表,它们存储不同的值列表。然后,还有2个查询,
  • 第一个query1将加入table1,当有匹配时output first every 5 sec
  • Secondquery2将执行类似的操作,将 join table2,并将每 5 秒输出找到的第一个值。
  • 这样做的目标是,每 5 秒,当input流中有一个值包含在表 1 中时,就会有一个匹配,如果有一个值包含在表 2 中,就会有一个不同的匹配,并且两者查询将保持沉默,直到下一个 5 秒块。

该应用程序如下

@App:name("delays_tables_join")

define stream input(value string);
define stream table_input(value string);
define table table1(value string);
define table table2(value string);

@sink(type='log')
define stream LogStream (value string);

-- fill table1
@info(name='insert table 1')
from table_input[value == '1']
insert into table1;

-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table2;

-- query input join with table 1, output once every 5 sec
@info(name='query1')
from input join table1 on input.value == table1.value
select input.value
output first every 5 sec
insert into LogStream;

-- query input join with table 2, output once every 5 sec
@info(name='query2')
from input join table2 on input.value == table2.value
select input.value
output first every 5 sec
insert into LogStream;
  • 运行此应用程序时,首先将其发送到table_input1,并2填充两个表

  • 然后,它开始向输入流重复发送值:1, 2, 1, 2, 1, 2...

  • 预计LogStream每 5 秒有 2 个值,第一次出现1value ,第一次出现 value 2

  • 但是相反,只有第一次出现的 value1一直出现,而不是 value2

[2020-04-02_18-55-16_498] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846516098, data=[1], isExpired=false} 
[2020-04-02_18-55-21_508] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846521098, data=[1], isExpired=false} 

请注意,当不涉及表连接时,两个查询都按预期工作。没有连接的例子:

@App:name("delays")
define stream Input(value string);

@sink(type='log')
define stream LogStream (value string);

@info(name='query1')
from Input[value == '1']
select value
output first every 5 sec
insert into LogStream;

@info(name='query2')
from Input[value == '2']
select value
output first every 5 sec
insert into LogStream;

这将产生以下输出:

[2020-04-02_18-53-50_305] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430304, data=[1], isExpired=false} 
[2020-04-02_18-53-50_706] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430305, data=[2], isExpired=false} 
[2020-04-02_18-53-55_312] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846438305, data=[1], isExpired=false} 
[2020-04-02_18-53-56_114] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846439305, data=[2], isExpired=false}

.

我想知道这种行为是否是预期的,或者应用程序的设计中是否存在任何错误。

非常感谢!

4

1 回答 1

0

通过在插入行中将table1更改为table2来修复“插入表 2”查询,我能够获得“无连接”中的结果

-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table1;
于 2020-06-18T15:20:55.717 回答