1

我正在运行一个 faust 应用程序,它通过 Kafka 主题接收 protobuf 消息,并将它们插入到滚动窗口表中,其中用户 ID 作为键,值作为包含计数、用户 ID 和记录时间(以秒为单位)的浮士德记录。在随后的更新中,我将传入的计数对象与现有对象合并并更新表。从翻转窗口发出的过期记录被插入到表中。

这里的问题是我看到一些写入表中的记录在到期时没有发出。由于表是relative_to_field,所以我检查了faust记录中的日志时间,它是准时的。我只在生产中看到这个问题。在当地一切正常。

请注意:

  1. 有关该主题的传入消息采用 protobuf 格式。使用编解码器 (CountsProtoCodec),我将传入的 protobuf 转换为浮士德记录。
  2. 商店是rocksdb。
  3. 浮士德版本是 1.10.4

请在这里帮助我。下面是我使用的示例代码片段。

class Count(faust.Record, serializer=‘json’):
    def __abstract_init__(self) -> None:
        pass
    user_id: str = “”
    counts: int = 0
    log_time_in_sec: float = 0
def window_processor(event_key, event):
    batch_start = current_time_ms()
    try:
    except Exception as e:
        logger.error(f’window_processor,{event.user_id} ::  Error  {e}’, exc_info=True)
        pass
app.conf.table_cleanup_interval = 1.0
app.conf.table_key_index_size = 100000
faust.serializers.codecs.register(‘counts_proto’, CountsProtoCodec())
countsProto_schema = faust.Schema(
    key_type=str,
    value_type=bytes,
    value_serializer=‘counts_proto’
)
topic = app.topic(“counts-topic”, schema=countsProto_schema,
                           partitions=table_partition)
tumbling_table = (
    app.Table(
        ‘combined-table’,
        default=Count,
        key_type=str,
        value_type=Count,
        partitions=table_partition,
        on_window_close=window_processor,
    ).tumbling(900, expires=1).relative_to_field(Count.log_time_in_sec)
)
@app.agent(topic)
async def counts_message(msgs):
    async for key, message in msgs.items():
        try:
            if message.counts > 0:
                windowSet = tumbling_table.get(key, None)
                prev_count = windowSet.value()
                message.counts += prev_count.counts
                tumbling_table[key] = message
        except Exception as e:
        logger.error(f’window_processor,{event.user_id} ::  Error  {e}’, exc_info=True)
        pass 
4

0 回答 0