0

我对卡夫卡完全陌生。我和简单的消费者一起工作,没关系。现在我已经和平衡的消费者一起工作了,问题是我对卡夫卡只有一个模糊的概念。

    topic = client.topics["topic_name"]
    consumer_a = topic.get_balanced_consumer(consumer_group=b"group_code", zookeeper_connect="localhost:2181")
    consumer_b = topic.get_balanced_consumer(consumer_group=b"group_code", zookeeper_connect="localhost:2181")

    for message in consumer_a:
        if message is not None:
            pool.apply_async(self._handle_message, (message,))

    for message in consumer_b:
        if message is not None:
            pool.apply_async(self._handle_message, (message,))

这是我最初的方法想法。然而,循环只是停留在 consumer_a 中。我不确定如何处理循环退出。我想过让循环也处理我的另一个异步调用,但我不确定它是否正确。

任何人都可以提出相同的建议或指出一些更好的外行可以理解的文档吗?还是我错过了其他一些概念?

我的要求是同时处理两个消费者。谢谢

编辑 如果有人可以解释分区的概念,它也会很有用。

4

0 回答 0