我有 4 个消费者使用消费者组 ID 为一个主题消费数据。在 4 个消费者中,其中 2 个获得了分配给多个分区的仅一个分区的数据,其中 2 个获得了分配给该消费者的所有分区的数据。
如何让消费者获得分配给它的所有分区的数据?
我正在使用 fs2-kafka。
val brokers = "broker1:9094,broker2:9094,broker3:9094,broker4:9094"
val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
.withEnableAutoCommit(false)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withMaxPollInterval(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withPollTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withSessionTimeout(new FiniteDuration(30000, TimeUnit.MILLISECONDS))
.withHeartbeatInterval(new FiniteDuration(10000, TimeUnit.MILLISECONDS))
.withCommitTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withBootstrapServers(brokers)
.withClientId(UUID.randomUUID().toString)
.withGroupId("testConsumerGroupId")
def processChunks(
data: fs2.Chunk[CommittableConsumerRecord[F, K, V]]
)(implicit ce: ConcurrentEffect[F]): fs2.Stream[F, CommittableOffset[F]] =
for {
_ <- fs2.Stream.eval(handleChunkOfWork(data.map(_.record).toList))
offset <- fs2.Stream.chunk(data).map(_.offset)
} yield offset
def commitBatches(implicit ce: ConcurrentEffect[F]): Pipe[F, CommittableOffset[F], Unit] =
_.chunks
.evalMap { chunk =>
for {
_ <- CommittableOffsetBatch.fromFoldable(chunk).commit
} yield ()
}
consumerStream[F]
.using(kafkaConsumerSettings.consumerSettings)
.evalTap(_.subscribe("topic1"))
.flatMap(_.partitionedStream)
.flatMap { partionedStream =>
partionedStream.chunkLimit(streamConfig.batchSize).map(processChunks).map(commitBatches)
}
.parJoinUnbounded
.compile
.drain