Setup: 1 Topic (test-topic) with 4 partitions
- Vert.x: 4.0.3
- RxJava2 : 2.2.12
- Vert.x internally uses kafka-client: 2.6.0
- Kafka broker: 2.8.1
Scenario 1: When 1 consumer group with 1 consumer is used, the system takes 60 seconds to complete processing 100 messages
Scenario 2: When 1 consumer group with 4 consumers are used, then the system takes around 15 seconds to process same 100 messages
Scenario 3: When 4 consumer groups having same group id with 1 consumer per group is used, then the system takes 60 seconds to process.
My assumption was in Scenario 3, the time taken to process all the messages would be around 15 seconds but that is not the case. I verified by logs that the partitions are distributed and all the partitions recieve 25 messages. Can anyone please help me understand what I might be doing wrong here that adding consumer groups does not scale? Is this behavior normal (which I do not think so)
Note:
- The consumer and producer setting are default and the message delivery is "exactly-once".
- The code is tested on local setup and on AWS too.
Psuedo Code:
Observable<KafkaConsumerRecord<String, String>> observable = consumer.toObservable();
consumer.subscribe(topics);
observable
.flatMapCompletable(record -> producer.rxBeginTransaction()
.andThen(producer.rxSend(producerRecord))
.flatMapCompletable(recordMetadata -> Completable.fromAction(() -> {
Map<TopicPartition, OffsetAndMetadata> consumerOffsetsMap = new HashMap<>();
consumerOffsetsMap.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
producer.getDelegate().unwrap().sendOffsetsToTransaction(consumerOffsetsMap, "grp1");
}))
.andThen(producer.rxCommitTransaction())
.andThen(consumer.rxCommit())
).subscribe(() -> {
System.out.println("Processed successfully");
});