0

我有 4 个消费者使用消费者组 ID 为一个主题消费数据。在 4 个消费者中,其中 2 个获得了分配给多个分区的仅一个分区的数据,其中 2 个获得了分配给该消费者的所有分区的数据。

如何让消费者获得分配给它的所有分区的数据?

我正在使用 fs2-kafka。

val brokers = "broker1:9094,broker2:9094,broker3:9094,broker4:9094"

val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
      .withEnableAutoCommit(false)
      .withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withMaxPollInterval(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withPollTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withSessionTimeout(new FiniteDuration(30000, TimeUnit.MILLISECONDS))
      .withHeartbeatInterval(new FiniteDuration(10000, TimeUnit.MILLISECONDS))
      .withCommitTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withBootstrapServers(brokers)
      .withClientId(UUID.randomUUID().toString)
      .withGroupId("testConsumerGroupId")

 def processChunks(
      data: fs2.Chunk[CommittableConsumerRecord[F, K, V]]
  )(implicit ce: ConcurrentEffect[F]): fs2.Stream[F, CommittableOffset[F]] =
    for {
      _ <- fs2.Stream.eval(handleChunkOfWork(data.map(_.record).toList))
      offset <- fs2.Stream.chunk(data).map(_.offset)
    } yield offset

 def commitBatches(implicit ce: ConcurrentEffect[F]): Pipe[F, CommittableOffset[F], Unit] =
    _.chunks
      .evalMap { chunk =>
        for {
          _ <- CommittableOffsetBatch.fromFoldable(chunk).commit
        } yield ()
      }

consumerStream[F]
  .using(kafkaConsumerSettings.consumerSettings)
  .evalTap(_.subscribe("topic1"))
  .flatMap(_.partitionedStream)
  .flatMap { partionedStream =>
    partionedStream.chunkLimit(streamConfig.batchSize).map(processChunks).map(commitBatches)
  }
  .parJoinUnbounded
  .compile
  .drain
4

0 回答 0