2

我正在尝试使用通过“http-response”响应接收器填充的附加属性来丰富我的输入流。

我尝试使用带有 window 属性的“join”和“every”关键字来合并两个流并将生成的合并流插入另一个流以丰富它。

当传入事件以 1 秒或更长的时间间隔出现时,窗口属性(window.time(1 sec) 或 window.length(1))和“every”关键字效果很好。

当(例如 10 或 100 个)事件同时(在一秒钟内)发送时。然后合并的结果不是预期的。

具有“window”属性的那个(join)

**

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

**

带有“every”关键字的那个

**

from every e1=EventInputStream,e2=EventResponseStream
select e1.variable1 as variable1, e1.variable2 as variable2, e2.variable3 as variable3
insert into EventOutputStream;

**

有没有更好的方法来合并两个流以更新第三个流?

4

2 回答 2

1

要获取原始请求属性,可以使用自定义映射如下,

@source(type='http-call-response', sink.id='source-1'
       @map(type='json',@attributes(name='name', id='id', volume='trp:volume', price='trp:price')))
define stream responseStream(name String, id int, headers String, volume long, price float);

在这里,可以使用 访问请求属性trp:attributeName,在此示例中,只有名称来自响应,价格和数量来自请求。

于 2019-10-02T04:32:50.243 回答
-1

您的“每个”关键字方法中的语法不太正确。你有没有尝试过这样的事情:

from every (e1 = event1) -> e2=event2[e1.variable == e2.variable]
select e1.variable1, e2.variable1, e2.variable2
insert into outputEvent;

本文档可能会有所帮助。

于 2019-10-11T13:33:50.707 回答