1
     I created a test application to understand the window on-close callback functionality of the tumbling table. But I ran into an issue when running this application. In my test application, I have two timer tasks with each sending message with key as a string and value is a faust Records which has attributes :
item(value being 1)
date (current time in secs) 
 This producer sends a message for a particular key. There is also an agent which consumes this message and merges it with the existing record, if available, and sets this record in the tumbling table. I have also configured a function as a call back for window close.  The issue I see here is the messages emitted to the callback method are only from 1 partition per window. 

请注意当我们将消息发送到单个分区时。我没有看到任何问题。两条消息都可以正常发出。只有当消息被发送到不同的分区时,我才会看到这个问题。
为了解释这一点,我粘贴了下面我的应用程序的日志和下面的代码片段。

从日志中,每 5 秒将分别具有键 location_11 和 location_10 的两种类型的消息发送到主题到预定义的分区(0 和 1)。这些消息正在被有趣的 print_windowed_events 使用,并且还在更新表。从日志中可以清楚地看出,使用这些消息时没有丢包。但是关闭时回调函数仅记录每个窗口的一个键。

请指教


from datetime import datetime, timedelta
from time import time
import random
import faust

import logging
LOG_FILENAME = "logfile.log"
log = logging.getLogger("myapp")

class RawModel(faust.Record):
    item: int = 0
    date: float = float(1)


TOPIC = 'raw-event_14'
#SINK = 'agg-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 20
WINDOW_EXPIRES = 10
PARTITIONS = 4

app = faust.App('windowed-agg-20', broker=KAFKA, version=1, store='rocksdb://', topic_partitions=4, producer_linger_ms=20, producer_acks=1)

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel, partitions=4)


def window_processor(key, events):
    timestamp = key[1][0]
    log.warning(f'processing window: key {key}, dump {events.dumps()}') #events mean: {mean:.2f}, timestamp {timestamp}',

tumbling_table = (
    app.Table(
        TABLE,
        default=RawModel,
        partitions=PARTITIONS,
        on_window_close=window_processor,
        key_type=str,
        value_type=RawModel,
    )
    .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
    .relative_to_field(RawModel.date)
)


@app.agent(source)
async def print_windowed_events(stream):
    #async for event in stream:
    async for key, event in stream.items():
        windowSet = tumbling_table.get(key, None)
        raw_model = windowSet.value()
        event.item += raw_model.item
        tumbling_table[key] = event
        log.warning(f"consumer::: {key},value: {event.item} date ::: {event.date}")



@app.timer(5)
async def produce0():
    location_Id = f"location_10"
    raw_model = RawModel(item=1, date=float(time()))
    log.warning(f"produce::: {location_Id} value: {raw_model.item} date{raw_model.date}")
    await source.send(key=location_Id, partition=0, value=raw_model)

@app.timer(5)
async def produce1():
    location_Id = f"location_11"
    raw_model = RawModel(item=1, date=float(time()))
    log.warning(f"produce::: {location_Id} value: {raw_model.item} date{raw_model.date}")
    await source.send(key=location_Id, partition=1, value=raw_model)

if __name__ == '__main__':
    app.main()

[2021-07-14 19:18:38,316] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270518.316847 
[2021-07-14 19:18:38,318] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270518.318645 
[2021-07-14 19:18:38,327] [92170] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626270518.318645 
[2021-07-14 19:18:38,330] [92170] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626270518.316847 
[2021-07-14 19:18:43,317] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270523.317833 
[2021-07-14 19:18:43,319] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270523.319649 
[2021-07-14 19:18:43,326] [92170] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626270523.317833 
[2021-07-14 19:18:43,345] [92170] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626270523.319649 
[2021-07-14 19:18:44,037] [92170] [WARNING] processing window: key (b'location_11', (1626270500.0, 1626270519.9)), dump {'date': 1626270518.318645, 'value': 1, '__faust': {'ns': 'HelloWorld.RawModel'}} 
[2021-07-14 19:18:48,323] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270528.32321 
[2021-07-14 19:18:48,323] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270528.3238668 
[2021-07-14 19:18:48,330] [92170] [WARNING] consumer::: b'location_11',value: 2 date ::: 1626270528.32321 
[2021-07-14 19:18:48,334] [92170] [WARNING] consumer::: b'location_10',value: 2 date ::: 1626270528.3238668 
[2021-07-14 19:18:53,325] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270533.325123 
[2021-07-14 19:18:53,326] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270533.326198 
[2021-07-14 19:18:53,339] [92170] [WARNING] consumer::: b'location_11',value: 3 date ::: 1626270533.325123 
[2021-07-14 19:18:53,343] [92170] [WARNING] consumer::: b'location_10',value: 3 date ::: 1626270533.326198 
[2021-07-14 19:18:58,329] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270538.329877 
[2021-07-14 19:18:58,330] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270538.330341 
[2021-07-14 19:18:58,335] [92170] [WARNING] consumer::: b'location_11',value: 4 date ::: 1626270538.330341 
[2021-07-14 19:18:58,340] [92170] [WARNING] consumer::: b'location_10',value: 4 date ::: 1626270538.329877 
[2021-07-14 19:19:03,331] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270543.331743 
[2021-07-14 19:19:03,332] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270543.332333 
[2021-07-14 19:19:03,340] [92170] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626270543.331743 
[2021-07-14 19:19:03,343] [92170] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626270543.332333 
[2021-07-14 19:19:04,082] [92170] [WARNING] processing window: key (b'location_11', (1626270520.0, 1626270539.9)), dump {'date': 1626270538.330341, 'value': 4, '__faust': {'ns':  'HelloWorld.RawModel'}}
4

0 回答 0