3

我正在尝试对到达 Kafka 主题(比如“datatopic”)的消息执行查找。查找源是另一个 Kafka 主题(比如“lookuptopic”)。为此,我使用 faust 创建了一个表,并创建了一个代理来使用新消息更新此表。在同一个应用程序中,我创建了另一个代理来执行查找(基于公共 ID 属性)。查找表中只有几千条记录,性能有点慢 - 每条记录 8 毫秒,相当于每秒 125 条记录。我只是想验证我是否以正确的方式做事(很抱歉我无法阅读整个浮士德文档)。

这是代码

import faust,time
from faust import current_event

app = faust.App(
    'lookup_table_v1',
    broker='kafka://localhost:9092',
    topic_partitions=4,store='rocksdb://'
)

class lookupSchema(faust.Record):
    id: str
    uuid: str

class tableSchema(faust.Record):
    uuid: str

class dataSchema(faust.Record):
    id: str


lookup_topic = app.topic('lookuptopic', value_type=lookupSchema)
data_topic = app.topic('datatopic', value_type=dataSchema)

lookup_table = app.Table('lookup_table')


# Agent to Update Lookup Table
@app.agent(lookup_topic)
async def count_lookup_table(lookup_stream):
    async for record in lookup_stream:
        lookup_table[record.id] = tableSchema.loads({"uuid":record.uuid}) 


# Agent to Perform Lookups for Data arriving in Data Topic
@app.agent(data_topic)
async def perform_lookup(data_stream):
  async for data in data_stream:
    event = current_event()
    print('found:'+lookup_table[data.id].to_representation()["uuid"]+'--> ms:'+str(round((time.time()-event.message.timestamp)*1000)))
4

0 回答 0