1

当翻滚窗口与 relative_to_field 一起使用时,它没有按预期工作。针对键本身更新的值不会反映在同一窗口中。它总是返回默认值而不是更新后的值。当我不使用“relative_to_field”时,它按预期工作。

import time
from datetime import timedelta

import faust

class Count(faust.Record, serializer='json'):
    user: str = ""
    count: int = 0
    timestamp: float = 0

count_schema = faust.Schema(
    key_type=str,
    value_type=Count,
    # key_serializer='str',
    value_serializer='json'
)

app = faust.App("Test_windowed_new", broker="localhost:9092", store='rocksdb://')
count_topic = app.topic("count_topic", schema=count_schema)


def window_processor(key, event):
    print("Windowing event 1 : {}".format(key[0]) + "Event " + str(event))

tumbling_table = (
    app.Table(
        "test_table_diff_new",
        default=Count,
        key_type=str,
        value_type=Count,
        partitions=1,
        on_window_close=window_processor,
    ).tumbling(5, key_index=True, expires=timedelta(seconds=5)).relative_to_field(Count.timestamp)
)


@app.agent(count_topic)
async def count_message(msgs):
    async for key,msg in msgs.items():
        print("incoming message  "+str(msg) +" for key " + key)
        windowSet = tumbling_table[key]
        prev_count = windowSet.value()
        count=  Count()
        count.count = msg.count + prev_count.count
        count.user=msg.user
        count.timestamp = msg.timestamp
        tumbling_table[key]= count
        print("Going to update :: " + str(count) + " for key " + key + "for log_time_in_sec  " + str(msg.timestamp))
        print("Value after update :: " + str(tumbling_table[key].value()) + " for key " + key)



@app.timer(interval=1.0,  on_leader=True)
async def send_count_kafka():
    count = Count()
    count.user = "test"
    count.count = 100
    count.timestamp =time.time()
    await count_message.send(
        key=str(count.user),
        value=count
    )
    print('Count Message :: send messge')

if __name__ == '__main__':
    print("Test Application getting started")
    app.main()

预期行为

下面的打印语句,我们在更新后从翻滚窗口打印最新值应该返回在上一步中更新的值“更新后的值::”+ str(tumbling_table[key].value())“

[2021-04-02 17:52:43,709] [29841] [WARNING] Value after update :: <Count: user='test', count=100, timestamp=1617366163.204891>  for key test 

实际行为

下面的打印语句,我们在更新后从翻滚窗口打印最新值给出默认值而不是更新值“更新后的值::”+ str(tumbling_table[key].value())“

[2021-04-02 17:52:43,698] [29841] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1617366163.204891> for key test 
[2021-04-02 17:52:43,709] [29841] [WARNING] Going to update :: <Count: user='test', count=100, timestamp=1617366163.204891> for key test for log_time_in_sec  1617366163.204891 
[2021-04-02 17:52:43,709] [29841] [WARNING] Value after update :: <Count: user='', count=0, timestamp=0> for key test 

版本

  • Python版本:3.8.8
  • 浮士德版本:v1.10.4
  • 操作系统:OSX Big Sur 11.2.3
  • 卡夫卡版本:kafka_2.12-2.3.0
  • RocksDB 版本:0.7.0

我不确定为什么在使用 relative_to_field 选项和翻转窗口时这个简单的代码不起作用。

4

1 回答 1

0

我想这是你想要的输出?因为这就是我从上面的代码中得到的。唯一的区别是我使用的是 Ubuntu。

[2021-04-13 10:36:14,106] [953605] [INFO] [^Worker]: Ready 
[2021-04-13 10:36:15,109] [953605] [WARNING] Count Message :: send message 
[2021-04-13 10:36:15,608] [953605] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1618302975.108263> for key test 
[2021-04-13 10:36:15,610] [953605] [WARNING] Going to update :: <Count: user='test', count=100, timestamp=1618302975.108263> for key test for log_time_in_sec  1618302975.108263 
[2021-04-13 10:36:15,610] [953605] [WARNING] Value after update :: <Count: user='test', count=100, timestamp=1618302975.108263> for key test 
[2021-04-13 10:36:16,110] [953605] [WARNING] Count Message :: send message 
[2021-04-13 10:36:16,114] [953605] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1618302976.1102037> for key test 
[2021-04-13 10:36:16,114] [953605] [WARNING] Going to update :: <Count: user='test', count=200, timestamp=1618302976.1102037> for key test for log_time_in_sec  1618302976.1102037 
[2021-04-13 10:36:16,114] [953605] [WARNING] Value after update :: <Count: user='test', count=200, timestamp=1618302976.1102037> for key test 
[2021-04-13 10:36:17,112] [953605] [WARNING] Count Message :: send message 
[2021-04-13 10:36:17,121] [953605] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1618302977.1120646> for key test 
[2021-04-13 10:36:17,121] [953605] [WARNING] Going to update :: <Count: user='test', count=300, timestamp=1618302977.1120646> for key test for log_time_in_sec  1618302977.1120646 
[2021-04-13 10:36:17,121] [953605] [WARNING] Value after update :: <Count: user='test', count=300, timestamp=1618302977.1120646> for key test 
[2021-04-13 10:36:18,114] [953605] [WARNING] Count Message :: send message 
[2021-04-13 10:36:18,120] [953605] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1618302978.1135106> for key test 
[2021-04-13 10:36:18,121] [953605] [WARNING] Going to update :: <Count: user='test', count=400, timestamp=1618302978.1135106> for key test for log_time_in_sec  1618302978.1135106 
[2021-04-13 10:36:18,122] [953605] [WARNING] Value after update :: <Count: user='test', count=400, timestamp=1618302978.1135106> for key test 
[2021-04-13 10:36:19,116] [953605] [WARNING] Count Message :: send message 
[2021-04-13 10:36:19,119] [953605] [WARNING] incoming message  <Count: user='test', count=100, timestamp=1618302979.1159449> for key test 
[2021-04-13 10:36:19,120] [953605] [WARNING] Going to update :: <Count: user='test', count=500, timestamp=1618302979.1159449> for key test for log_time_in_sec  1618302979.1159449 
[2021-04-13 10:36:19,120] [953605] [WARNING] Value after update :: <Count: user='test', count=500, timestamp=1618302979.1159449> for key test 
于 2021-04-13T08:46:31.923 回答