1

我将消息从 kafka 主题加载到数据库。加载到数据库可能会失败。我也不想丢失未发送的消息。

应用代码:

import faust

app = faust.App('App', broker='kafka://localhost:9092')

source_topic = app.topic('source_topic')
failed_channel = app.channel()  # channel for unsent messages


@app.agent(source_topic)
async def process(stream):
    async for batch in stream.take(100_000, within=60):
        # here we have not info about partitions and keys
        # to reuse them when resending if sending failed
        try:
            pass  # send to database.  can fail
        except ConnectionError:
            for record in batch:
                # sending to channel is faster than sending to topic
                await failed_channel.send(value=record)


@app.agent(failed_channel)
async def resend_failed(stream):
    async for unsent_msg in stream:
        await source_topic.send(value=unsent_msg)

也许有更标准的方式来处理这种情况?添加 app.topic('source_topic', acks=False) 仅在重新启动应用程序后才有效。

4

1 回答 1

1

我将消息从 kafka 主题加载到数据库

也许有更标准的方式来处理这种情况

是的 - 它被称为 Kafka Connect :-)

标准模式是对您的数据进行任何处理并将其写入 [回] Kafka 主题。然后,您将 Kafka 主题用作 Kafka Connect 接收器连接器的源,在本例中为Kafka Connect JDBC Sink 连接器

Kafka Connect 是 Apache Kafka 的一部分,负责处理重启、横向扩展、故障等。

另请参阅Kafka Connect 实战:JDBC Sink

于 2021-01-28T09:33:44.677 回答