0

Setup: 1 Topic (test-topic) with 4 partitions

  • Vert.x: 4.0.3
  • RxJava2 : 2.2.12
  • Vert.x internally uses kafka-client: 2.6.0
  • Kafka broker: 2.8.1

Scenario 1: When 1 consumer group with 1 consumer is used, the system takes 60 seconds to complete processing 100 messages

Scenario 2: When 1 consumer group with 4 consumers are used, then the system takes around 15 seconds to process same 100 messages

Scenario 3: When 4 consumer groups having same group id with 1 consumer per group is used, then the system takes 60 seconds to process.

My assumption was in Scenario 3, the time taken to process all the messages would be around 15 seconds but that is not the case. I verified by logs that the partitions are distributed and all the partitions recieve 25 messages. Can anyone please help me understand what I might be doing wrong here that adding consumer groups does not scale? Is this behavior normal (which I do not think so)

Note:

  • The consumer and producer setting are default and the message delivery is "exactly-once".
  • The code is tested on local setup and on AWS too.

Psuedo Code:

    Observable<KafkaConsumerRecord<String, String>> observable = consumer.toObservable();
    consumer.subscribe(topics);
    observable
            .flatMapCompletable(record -> producer.rxBeginTransaction()
                    .andThen(producer.rxSend(producerRecord))
                    .flatMapCompletable(recordMetadata -> Completable.fromAction(() -> {
                        Map<TopicPartition, OffsetAndMetadata> consumerOffsetsMap = new HashMap<>();
                        consumerOffsetsMap.put(
                                new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset()));
                        producer.getDelegate().unwrap().sendOffsetsToTransaction(consumerOffsetsMap, "grp1");
                    }))
                    .andThen(producer.rxCommitTransaction())
                    .andThen(consumer.rxCommit())
            ).subscribe(() -> {
                System.out.println("Processed successfully");
            });
4

1 回答 1

0

一组将(大约)与多个独立组具有相同的处理时间。

有了一个完美平衡的主题,并立即重新平衡消费者组,一个具有分布式消费者的组将花费一小部分时间,正如您所发现的 -

(time of one consumer reading all partitions / min(number of partitions, number of group members)

在你的情况下,60 / min(4,4)= 15


场景 3:当 4 个消费组具有相同的组 id 时

这实际上是不可能的。我假设您的意思是不同的组ID。这将是场景 3 与场景 1 花费相同时间的唯一原因。因为您将场景 1 重复四次。运行 4 个组需要相同的时间这一事实意味着它确实可以扩展(水平)。减少的时间意味着组内的垂直缩放,您在场景 2 中看到了这一点。

在 Kubernetes 上运行的应用程序。只需更改副本以增加实例数

仅此一项不会创造独特的消费群体。您需要group.id为每个应用程序注入不同的价值来创建新的消费者群体。

于 2022-01-24T14:52:20.023 回答