I created a test application to understand the window on-close callback functionality of the tumbling table. But I ran into an issue when running this application. In my test application, I have two timer tasks with each sending message with key as a string and value is a faust Records which has attributes :
item(value being 1)
date (current time in secs)
This producer sends a message for a particular key. There is also an agent which consumes this message and merges it with the existing record, if available, and sets this record in the tumbling table. I have also configured a function as a call back for window close. The issue I see here is the messages emitted to the callback method are only from 1 partition per window.
请注意当我们将消息发送到单个分区时。我没有看到任何问题。两条消息都可以正常发出。只有当消息被发送到不同的分区时,我才会看到这个问题。
为了解释这一点,我粘贴了下面我的应用程序的日志和下面的代码片段。
从日志中,每 5 秒将分别具有键 location_11 和 location_10 的两种类型的消息发送到主题到预定义的分区(0 和 1)。这些消息正在被有趣的 print_windowed_events 使用,并且还在更新表。从日志中可以清楚地看出,使用这些消息时没有丢包。但是关闭时回调函数仅记录每个窗口的一个键。
请指教
from datetime import datetime, timedelta
from time import time
import random
import faust
import logging
LOG_FILENAME = "logfile.log"
log = logging.getLogger("myapp")
class RawModel(faust.Record):
item: int = 0
date: float = float(1)
TOPIC = 'raw-event_14'
#SINK = 'agg-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 20
WINDOW_EXPIRES = 10
PARTITIONS = 4
app = faust.App('windowed-agg-20', broker=KAFKA, version=1, store='rocksdb://', topic_partitions=4, producer_linger_ms=20, producer_acks=1)
app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel, partitions=4)
def window_processor(key, events):
timestamp = key[1][0]
log.warning(f'processing window: key {key}, dump {events.dumps()}') #events mean: {mean:.2f}, timestamp {timestamp}',
tumbling_table = (
app.Table(
TABLE,
default=RawModel,
partitions=PARTITIONS,
on_window_close=window_processor,
key_type=str,
value_type=RawModel,
)
.tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
.relative_to_field(RawModel.date)
)
@app.agent(source)
async def print_windowed_events(stream):
#async for event in stream:
async for key, event in stream.items():
windowSet = tumbling_table.get(key, None)
raw_model = windowSet.value()
event.item += raw_model.item
tumbling_table[key] = event
log.warning(f"consumer::: {key},value: {event.item} date ::: {event.date}")
@app.timer(5)
async def produce0():
location_Id = f"location_10"
raw_model = RawModel(item=1, date=float(time()))
log.warning(f"produce::: {location_Id} value: {raw_model.item} date{raw_model.date}")
await source.send(key=location_Id, partition=0, value=raw_model)
@app.timer(5)
async def produce1():
location_Id = f"location_11"
raw_model = RawModel(item=1, date=float(time()))
log.warning(f"produce::: {location_Id} value: {raw_model.item} date{raw_model.date}")
await source.send(key=location_Id, partition=1, value=raw_model)
if __name__ == '__main__':
app.main()
[2021-07-14 19:18:38,316] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270518.316847
[2021-07-14 19:18:38,318] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270518.318645
[2021-07-14 19:18:38,327] [92170] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626270518.318645
[2021-07-14 19:18:38,330] [92170] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626270518.316847
[2021-07-14 19:18:43,317] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270523.317833
[2021-07-14 19:18:43,319] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270523.319649
[2021-07-14 19:18:43,326] [92170] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626270523.317833
[2021-07-14 19:18:43,345] [92170] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626270523.319649
[2021-07-14 19:18:44,037] [92170] [WARNING] processing window: key (b'location_11', (1626270500.0, 1626270519.9)), dump {'date': 1626270518.318645, 'value': 1, '__faust': {'ns': 'HelloWorld.RawModel'}}
[2021-07-14 19:18:48,323] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270528.32321
[2021-07-14 19:18:48,323] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270528.3238668
[2021-07-14 19:18:48,330] [92170] [WARNING] consumer::: b'location_11',value: 2 date ::: 1626270528.32321
[2021-07-14 19:18:48,334] [92170] [WARNING] consumer::: b'location_10',value: 2 date ::: 1626270528.3238668
[2021-07-14 19:18:53,325] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270533.325123
[2021-07-14 19:18:53,326] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270533.326198
[2021-07-14 19:18:53,339] [92170] [WARNING] consumer::: b'location_11',value: 3 date ::: 1626270533.325123
[2021-07-14 19:18:53,343] [92170] [WARNING] consumer::: b'location_10',value: 3 date ::: 1626270533.326198
[2021-07-14 19:18:58,329] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270538.329877
[2021-07-14 19:18:58,330] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270538.330341
[2021-07-14 19:18:58,335] [92170] [WARNING] consumer::: b'location_11',value: 4 date ::: 1626270538.330341
[2021-07-14 19:18:58,340] [92170] [WARNING] consumer::: b'location_10',value: 4 date ::: 1626270538.329877
[2021-07-14 19:19:03,331] [92170] [WARNING] produce::: location_11 value: 1 date :: 1626270543.331743
[2021-07-14 19:19:03,332] [92170] [WARNING] produce::: location_10 value: 1 date :: 1626270543.332333
[2021-07-14 19:19:03,340] [92170] [WARNING] consumer::: b'location_11',value: 1 date ::: 1626270543.331743
[2021-07-14 19:19:03,343] [92170] [WARNING] consumer::: b'location_10',value: 1 date ::: 1626270543.332333
[2021-07-14 19:19:04,082] [92170] [WARNING] processing window: key (b'location_11', (1626270520.0, 1626270539.9)), dump {'date': 1626270538.330341, 'value': 4, '__faust': {'ns': 'HelloWorld.RawModel'}}