0

我需要将多个事件收集到一个事件中并将其视为一个事件。输出流应包含输入事件作为列表。

基本上,当通过以下事件时

{InvoiceNumber:1, LineItem: 1, Value: 100}
{InvoiceNumber:1, LineItem: 2, Value: 200}
{InvoiceNumber:1, LineItem: 3, Value: 300}

我需要输出如下

[InvoiceNumber:1, lineItems: [{LineItem: 1, Value: 100}, {LineItem: 2, Value: 200}, {LineItem: 3, Value: 300}]

如何使用 WSO2 Streaming Integrator 实现这一目标?或 Siddhi.io。

我尝试了以下操作,但它仍然将每个流插入到输出流中

partition with (InvoiceNo of CSVInputStream)
begin

    from every e1=CSVInputStream
        within 1 min
    select e1.InvoiceNo, list:collect(e1.LineItem) as lineItems
    group by e1.InvoiceNo
    insert into AggregatedStream;
end;
4

1 回答 1

0

不要使用分区,因为这是一个简单的用例,请尝试 windows。您的情况下的时间批处理窗口,https://siddhi.io/en/v5.1/docs/api/latest/#timebatch-window

from CSVInputStream#window.timeBatch(1 min)
select e1.InvoiceNo, list:collect(e1.LineItem) as lineItems
group by e1.InvoiceNo
insert into AggregatedStream;
于 2020-09-16T12:09:18.877 回答