我正在尝试对到达 Kafka 主题(比如“datatopic”)的消息执行查找。查找源是另一个 Kafka 主题(比如“lookuptopic”)。为此,我使用 faust 创建了一个表,并创建了一个代理来使用新消息更新此表。在同一个应用程序中,我创建了另一个代理来执行查找(基于公共 ID 属性)。查找表中只有几千条记录,性能有点慢 - 每条记录 8 毫秒,相当于每秒 125 条记录。我只是想验证我是否以正确的方式做事(很抱歉我无法阅读整个浮士德文档)。
这是代码
import faust,time
from faust import current_event
app = faust.App(
'lookup_table_v1',
broker='kafka://localhost:9092',
topic_partitions=4,store='rocksdb://'
)
class lookupSchema(faust.Record):
id: str
uuid: str
class tableSchema(faust.Record):
uuid: str
class dataSchema(faust.Record):
id: str
lookup_topic = app.topic('lookuptopic', value_type=lookupSchema)
data_topic = app.topic('datatopic', value_type=dataSchema)
lookup_table = app.Table('lookup_table')
# Agent to Update Lookup Table
@app.agent(lookup_topic)
async def count_lookup_table(lookup_stream):
async for record in lookup_stream:
lookup_table[record.id] = tableSchema.loads({"uuid":record.uuid})
# Agent to Perform Lookups for Data arriving in Data Topic
@app.agent(data_topic)
async def perform_lookup(data_stream):
async for data in data_stream:
event = current_event()
print('found:'+lookup_table[data.id].to_representation()["uuid"]+'--> ms:'+str(round((time.time()-event.message.timestamp)*1000)))