有谁知道如何使用 Faust 实现滑动窗口?
这个想法是计算一个键在 10、30、60 和 300 秒窗口中的出现次数,但我们需要在 1 秒或每次更新的基础上进行计数。
我有一个狡猾的解决方法,这似乎非常低效,我有一个翻滚的 1 秒窗口,到期时间为 300 秒,然后我使用该delta()
方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到滞后之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。
更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内拥有相同的统计数据……所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入提供专门的流程。
这是我的狡猾代码:
class AlarmCount(faust.Record, serializer='json'):
event_id: int
source_id: int
counts_10: int
counts_30: int
counts_60: int
counts_300: int
@app.agent(events_topic)
async def new_event(stream):
async for value in stream:
# calculate the count statistics
counts_10=0
counts_30=0
counts_60=0
counts_300=0
event_counts_table[value.global_id] += 1
for i in range(300):
if(i<=10):
counts_10+=event_counts_table[value.source_id].delta(i)
if(i<=30):
counts_30+=event_counts_table[value.source_id].delta(i)
if(i<=60):
counts_60+=event_counts_table[value.source_id].delta(i)
if(i<=300):
counts_300+=event_counts_table[value.source_id].delta(i)
await event_counts_topic.send(
value=EventCount(
event_id=value.event_id,
source_id=value.source_id,
counts_10=counts_10,
counts_30=counts_30,
counts_60=counts_60,
counts_300=counts_300
)
)