我正在运行一个 faust 应用程序,它通过 Kafka 主题接收 protobuf 消息,并将它们插入到滚动窗口表中,其中用户 ID 作为键,值作为包含计数、用户 ID 和记录时间(以秒为单位)的浮士德记录。在随后的更新中,我将传入的计数对象与现有对象合并并更新表。从翻转窗口发出的过期记录被插入到表中。
这里的问题是我看到一些写入表中的记录在到期时没有发出。由于表是relative_to_field,所以我检查了faust记录中的日志时间,它是准时的。我只在生产中看到这个问题。在当地一切正常。
请注意:
- 有关该主题的传入消息采用 protobuf 格式。使用编解码器 (CountsProtoCodec),我将传入的 protobuf 转换为浮士德记录。
- 商店是rocksdb。
- 浮士德版本是 1.10.4
请在这里帮助我。下面是我使用的示例代码片段。
class Count(faust.Record, serializer=‘json’):
def __abstract_init__(self) -> None:
pass
user_id: str = “”
counts: int = 0
log_time_in_sec: float = 0
def window_processor(event_key, event):
batch_start = current_time_ms()
try:
except Exception as e:
logger.error(f’window_processor,{event.user_id} :: Error {e}’, exc_info=True)
pass
app.conf.table_cleanup_interval = 1.0
app.conf.table_key_index_size = 100000
faust.serializers.codecs.register(‘counts_proto’, CountsProtoCodec())
countsProto_schema = faust.Schema(
key_type=str,
value_type=bytes,
value_serializer=‘counts_proto’
)
topic = app.topic(“counts-topic”, schema=countsProto_schema,
partitions=table_partition)
tumbling_table = (
app.Table(
‘combined-table’,
default=Count,
key_type=str,
value_type=Count,
partitions=table_partition,
on_window_close=window_processor,
).tumbling(900, expires=1).relative_to_field(Count.log_time_in_sec)
)
@app.agent(topic)
async def counts_message(msgs):
async for key, message in msgs.items():
try:
if message.counts > 0:
windowSet = tumbling_table.get(key, None)
prev_count = windowSet.value()
message.counts += prev_count.counts
tumbling_table[key] = message
except Exception as e:
logger.error(f’window_processor,{event.user_id} :: Error {e}’, exc_info=True)
pass