1

重现步骤

我试图在翻滚窗口中聚合一些数据,然后将处理函数应用于窗口中的数据。我正在使用expires 参数来处理迟到的事件 (假设我们可以在n+1分钟的前 10 秒内获得属于n分钟的事件)。

def parse_millis(ms):
    return datetime.fromtimestamp(int(ms) / 1000)


def process_window_function(window_info, values: list):
    logger.info(f"Processing window with "
                f"start = {datetime.fromtimestamp(window_info[1][0])}, "
                f"end = {datetime.fromtimestamp(window_info[1][1])}")
    logger.info(values)


class InputClass(faust.Record, coerce=True):
    id: str
    timestamp: datetime = DatetimeField(date_parser=parse_millis)
    value: int


tumbling_window_table = (
    app.Table(
        'tumbling_window_table',
        default=list,
        on_window_close=process_window_function,
    )
        .tumbling(size=60, expires=timedelta(seconds=10))
        .relative_to_field(InputClass.timestamp)
)

input_topic = app.topic("input.topic", value_type=InputClass)


@app.agent(input_topic)
async def process_input(stream):
    event: InputClass
    async for event in stream:
        logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
        list_of_values = tumbling_window_table[event.id].value()
        list_of_values.append(event.value)
        tumbling_window_table[event.id] = list_of_values

预期行为

我希望仅当传递n+1窗口的 10 秒以处理延迟事件时才调用n窗口的 process_window_function

实际行为

如果 Table 的expires参数小于size参数,则窗口n的 process_window_function将在窗口n+1的第一个事件之后立即调用。看起来浮士德只是忽略了expires。对于这种行为 ,可能会稍晚到达的迟到的事件将被跳过。

如果 expires 参数等于或大于大小,迟到的事件将得到正确处理,但我不希望延迟超过 10 秒。

卡夫卡输入

{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}

日志

[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready 
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state 
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state 
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state 
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000 
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2] 

版本

  • 蟒蛇版本3.7.9
  • 浮士德版本faust-streaming==0.6.1
  • RocksDB 版本python-rocksdb

我有可能在 Flink 中实现这种行为,但在 Faust 中遇到了这个问题。我究竟做错了什么?

4

1 回答 1

0

这可能是我遇到的同样问题,如果是这样,这可能是解决方案。我必须clean_up_interval手动设置,因为它默认为 30 秒。此属性是检查过期表数据之前的时间。

您可以通过app.conf.table_clean_up_interval = <time as int or float>在以典型app = faust.App()方式定义您的应用程序后进行设置来完成。

您可以在settings.py文件和一个工作示例(最近可能更改?)中找到方法。

唯一的问题似乎是,如果应用程序崩溃(或重新平衡??),on_window_close 似乎不会正确触发 - 就像在没有工作人员观看时过期的窗口永远消失了,你永远不知道他们。但是我还没有使用 RocksDB,只是在内存中,所以也许还有更多你可以帮助的东西?

我仍在努力了解迟到的事件,因为我正在使用相同的过程来执行非常长的聚合(例如 3 个月,间隔为 1 秒),但我无法弄清楚是否将非常旧的数据放入与它的时间戳匹配的窗口或仅与当前窗口匹配的窗口。我认为它会根据与窗口的时间戳关系将其放入正确的窗口中,但无法确认。

于 2021-09-28T06:36:14.887 回答