我将消息从 kafka 主题加载到数据库。加载到数据库可能会失败。我也不想丢失未发送的消息。
应用代码:
import faust
app = faust.App('App', broker='kafka://localhost:9092')
source_topic = app.topic('source_topic')
failed_channel = app.channel() # channel for unsent messages
@app.agent(source_topic)
async def process(stream):
async for batch in stream.take(100_000, within=60):
# here we have not info about partitions and keys
# to reuse them when resending if sending failed
try:
pass # send to database. can fail
except ConnectionError:
for record in batch:
# sending to channel is faster than sending to topic
await failed_channel.send(value=record)
@app.agent(failed_channel)
async def resend_failed(stream):
async for unsent_msg in stream:
await source_topic.send(value=unsent_msg)
也许有更标准的方式来处理这种情况?添加 app.topic('source_topic', acks=False) 仅在重新启动应用程序后才有效。