重现步骤
我试图在翻滚窗口中聚合一些数据,然后将处理函数应用于窗口中的数据。我正在使用expires 参数来处理迟到的事件 (假设我们可以在n+1分钟的前 10 秒内获得属于n分钟的事件)。
def parse_millis(ms):
return datetime.fromtimestamp(int(ms) / 1000)
def process_window_function(window_info, values: list):
logger.info(f"Processing window with "
f"start = {datetime.fromtimestamp(window_info[1][0])}, "
f"end = {datetime.fromtimestamp(window_info[1][1])}")
logger.info(values)
class InputClass(faust.Record, coerce=True):
id: str
timestamp: datetime = DatetimeField(date_parser=parse_millis)
value: int
tumbling_window_table = (
app.Table(
'tumbling_window_table',
default=list,
on_window_close=process_window_function,
)
.tumbling(size=60, expires=timedelta(seconds=10))
.relative_to_field(InputClass.timestamp)
)
input_topic = app.topic("input.topic", value_type=InputClass)
@app.agent(input_topic)
async def process_input(stream):
event: InputClass
async for event in stream:
logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
list_of_values = tumbling_window_table[event.id].value()
list_of_values.append(event.value)
tumbling_window_table[event.id] = list_of_values
预期行为
我希望仅当传递n+1窗口的 10 秒以处理延迟事件时才调用n窗口的 process_window_function
实际行为
如果 Table 的expires参数小于size参数,则窗口n的 process_window_function将在窗口n+1的第一个事件之后立即调用。看起来浮士德只是忽略了expires。对于这种行为 ,可能会稍晚到达的迟到的事件将被跳过。
如果 expires 参数等于或大于大小,迟到的事件将得到正确处理,但我不希望延迟超过 10 秒。
卡夫卡输入
{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}
日志
[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2]
版本
- 蟒蛇版本
3.7.9
- 浮士德版本
faust-streaming==0.6.1
- RocksDB 版本
python-rocksdb
我有可能在 Flink 中实现这种行为,但在 Faust 中遇到了这个问题。我究竟做错了什么?