0

我有 1 个消费者组和 5 个消费者。也有 5 个分区,因此每个消费者获得 1 个分区。

CLI 还表明

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1  TopicId: kJqfk1FoRSWtkkjfsgw9FSg    PartitionCount: 5   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: Topic-1  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: Topic-1  Partition: 4    Leader: 0   Replicas: 0 Isr: 0

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} 正确显示每个分区的不同消息。

但是,我经常看到 2 个或更多消费者处理相同的消息并且是 kafka 的新手,我真的无法找出问题所在。

我正在使用 pykafka 来消费消息:

class CB_Kafka_Consumer:
    def __init__(self):
        self._connect_kafka_consumer()
        module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
        ''' Get DB session object '''
        self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
        module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")

    def _connect_kafka_consumer(self):
        self._consumer = None
        try:
            self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
            topic = self._client.topics[kafka_topic]
            self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)

            module_logger.info("Created a Kafka Consumer")
        except Exception as ex:
            module_logger.error('Exception while connecting Kafka')
            traceback.print_exc()

    def start_consuming(self):
        module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
        while True:
            for msg in self._consumer:
                self._consumer.commit_offsets()
                message = json.loads(msg.value.decode('utf-8'))
                module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
                module_logger.debug(pprint.pformat(message))
                self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
        self._consumer.close()
4

1 回答 1

1

打印消息的分区和偏移量。实际上,您应该看到它们是您正在处理的独特事件。

If those are the same, the "10min to 4hr" process is very likely causing a consumer group rebalance (Kafka requires you to invoke a record poll every few milliseconds, by default), and you're experiencing at-least-once processing semantics, and therefore need to handle duplicates on your own.

I see you're using some database client in your code, and so the recommendation would be to use Kafka Connect framework, rather than writing your own Consumer

于 2022-02-24T15:45:57.467 回答