当翻滚窗口与 relative_to_field 一起使用时,它没有按预期工作。针对键本身更新的值不会反映在同一窗口中。它总是返回默认值而不是更新后的值。当我不使用“relative_to_field”时,它按预期工作。
import time
from datetime import timedelta
import faust
class Count(faust.Record, serializer='json'):
user: str = ""
count: int = 0
timestamp: float = 0
count_schema = faust.Schema(
key_type=str,
value_type=Count,
# key_serializer='str',
value_serializer='json'
)
app = faust.App("Test_windowed_new", broker="localhost:9092", store='rocksdb://')
count_topic = app.topic("count_topic", schema=count_schema)
def window_processor(key, event):
print("Windowing event 1 : {}".format(key[0]) + "Event " + str(event))
tumbling_table = (
app.Table(
"test_table_diff_new",
default=Count,
key_type=str,
value_type=Count,
partitions=1,
on_window_close=window_processor,
).tumbling(5, key_index=True, expires=timedelta(seconds=5)).relative_to_field(Count.timestamp)
)
@app.agent(count_topic)
async def count_message(msgs):
async for key,msg in msgs.items():
print("incoming message "+str(msg) +" for key " + key)
windowSet = tumbling_table[key]
prev_count = windowSet.value()
count= Count()
count.count = msg.count + prev_count.count
count.user=msg.user
count.timestamp = msg.timestamp
tumbling_table[key]= count
print("Going to update :: " + str(count) + " for key " + key + "for log_time_in_sec " + str(msg.timestamp))
print("Value after update :: " + str(tumbling_table[key].value()) + " for key " + key)
@app.timer(interval=1.0, on_leader=True)
async def send_count_kafka():
count = Count()
count.user = "test"
count.count = 100
count.timestamp =time.time()
await count_message.send(
key=str(count.user),
value=count
)
print('Count Message :: send messge')
if __name__ == '__main__':
print("Test Application getting started")
app.main()
预期行为
下面的打印语句,我们在更新后从翻滚窗口打印最新值应该返回在上一步中更新的值“更新后的值::”+ str(tumbling_table[key].value())“
[2021-04-02 17:52:43,709] [29841] [WARNING] Value after update :: <Count: user='test', count=100, timestamp=1617366163.204891> for key test
实际行为
下面的打印语句,我们在更新后从翻滚窗口打印最新值给出默认值而不是更新值“更新后的值::”+ str(tumbling_table[key].value())“
[2021-04-02 17:52:43,698] [29841] [WARNING] incoming message <Count: user='test', count=100, timestamp=1617366163.204891> for key test
[2021-04-02 17:52:43,709] [29841] [WARNING] Going to update :: <Count: user='test', count=100, timestamp=1617366163.204891> for key test for log_time_in_sec 1617366163.204891
[2021-04-02 17:52:43,709] [29841] [WARNING] Value after update :: <Count: user='', count=0, timestamp=0> for key test
版本
- Python版本:3.8.8
- 浮士德版本:v1.10.4
- 操作系统:OSX Big Sur 11.2.3
- 卡夫卡版本:kafka_2.12-2.3.0
- RocksDB 版本:0.7.0
我不确定为什么在使用 relative_to_field 选项和翻转窗口时这个简单的代码不起作用。