2

我们正面临这样一种情况,即我们的 akka-stream-kaka-consumer 处理率在出现延迟时会下降。当我们在分区中没有任何延迟的情况下启动它时,处理速度会突然增加。

MSK 集群 - 10 个主题 - 每个 40 个分区 => 400 个总领导分区

为了在系统中实现高吞吐量和并行性,我们实现了 akka-stream-kafka 消费者,分别订阅每个主题分区,从而在消费者和分区之间实现 1:1 映射。

这是消费者设置:

  1. ec2 服务实例数 - 7
  2. 每个服务为 10 个主题中的每个主题启动 6 个消费者,从而导致每个服务实例有 60 个消费者。
  3. 总消费者 = 实例数 (7) * 每个服务实例上的消费者数 (60) = 420

因此,我们总共启动了 420 个消费者,分布在不同的实例中。根据 RangeAssignor 分区策略(默认一个),每个分区将分配给不同的消费者,400 个消费者将使用 400 个分区,20 个消费者将保持未使用状态。我们已经验证了这个分配并且看起来不错。

使用的实例类型: c5.xlarge

MSK 配置:

阿帕奇卡夫卡版本- 2.4.1.1

经纪人总数- 9(分布在 3 个可用区)

经纪人类型: kafka.m5.large

每个区域的经纪人: 3

auto.create.topics.enable =真

default.replication.factor =3

min.insync.replicas =2

num.io.threads =8

num.network.threads = 5

分区数=40

num.replica.fetchers =2

副本.lag.time.max.ms = 30000

socket.receive.buffer.bytes = 102400

socket.request.max.bytes =104857600

socket.send.buffer.bytes = 102400

unclean.leader.election.enable =真

zookeeper.session.timeout.ms = 18000

log.retention.ms = 259200000

这是我们为每个消费者使用的配置

akka.kafka.consumer {
 kafka-clients {
  bootstrap.servers = "localhost:9092"
  client.id = "consumer1"
  group.id = "consumer1"
  auto.offset.reset="latest"
 }
 aws.glue.registry.name="Registry1"
 aws.glue.avroRecordType = "GENERIC_RECORD"
 aws.glue.region = "region"
 

    kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"

 # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
 # configured by `consumer.metadata-request-timeout`
 connection-checker {

  #Flag to turn on connection checker
  enable = true

  # Amount of attempts to be performed after a first connection failure occurs
  # Required, non-negative integer
  max-retries = 3

  # Interval for the connection check. Used as the base for exponential retry.
  check-interval = 15s

  # Check interval multiplier for backoff interval
  # Required, positive number
  backoff-factor = 2.0
 }
}

akka.kafka.committer {

 # Maximum number of messages in a single commit batch
 max-batch = 10000

 # Maximum interval between commits
 max-interval = 5s

 # Parallelism for async committing
 parallelism = 1500

 # API may change.
 # Delivery of commits to the internal actor
 # WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
 # SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
 delivery = WaitForAck

 # API may change.
 # Controls when a `Committable` message is queued to be committed.
 # OffsetFirstObserved: When the offset of a message has been successfully produced.
 # NextOffsetObserved: When the next offset is observed.
 when = OffsetFirstObserved
}


akka.http {
 client {
  idle-timeout = 10s
 }
 host-connection-pool {
  idle-timeout = 10s
  client {
   idle-timeout = 10s
  }
 }
}

consumer.parallelism=1500

我们正在使用下面的代码来实现从 Kafka 到空接收器的流程

override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")


val supervisionDecider: Supervision.Decider = {
 case _ => Supervision.Resume
}

val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
 .committableSource(consumerSettings, Subscriptions.topics(topicsName))
 .mapAsync(parallelism) {
  msg =>
   f(msg.record.key(), msg.record.value())
    .map(_ => msg.committableOffset)
    .recoverWith {
     case _ => Future.successful(msg.committableOffset)
    }
 }
 .toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
 .withAttributes(supervisionStrategy)

代码中的库版本

"com.typesafe.akka" %% "akka-http"            % "10.1.11",
 "com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
 "com.typesafe.akka" %% "akka-stream" % "2.5.30"

观察如下,

  1. 假设在 1 小时的连续间隔中,只有一些消费者
    以预期的速度积极地消耗延迟和处理。
  2. 在接下来的 1 小时内,其他一些消费者变得活跃并
    从其分区中主动消费,然后停止处理。
  3. 从 offsetLag 图表中观察到,所有滞后都将在一次拍摄中清除。

我们希望所有消费者并行运行并实时处理消息。这 3 天的处理延迟给我们造成了很大的停机时间。我尝试按照给定的链接,但我们已经在固定版本 https://github.com/akka/alpakka-kafka/issues/549

任何人都可以帮助我们在消费者配置或其他问题方面缺少什么。

每个主题每个分区的偏移滞后图

4

1 回答 1

2

在我看来,该滞后图表明您的整个系统无法处理所有负载,而且几乎每次只有一个分区实际上正在取得进展。

这种现象向我表明,正在进行的处理f最终决定了某些队列可以被清除的速率,并且mapAsync阶段中的并行度太高,有效地使分区相互竞争。由于 Kafka 消费者对记录进行批处理(默认为 500 条,假设消费者的滞后超过 500 条记录),如果并行度高于此,所有这些记录基本上与一个块同时进入队列。看起来并行度mapAsync是1500;鉴于 Kafka 默认 500 批量大小的明显使用,这似乎太高了:它没有理由大于 Kafka 批量大小,如果你想要分区之间更均匀的消耗率,它应该少很多比那个批量大小。

如果没有详细说明 中发生了什么f,很难说该队列是什么以及应该减少多少并行度。但我可以分享一些一般准则:

  • 如果工作是 CPU 密集型的(这表明您的消费者的 CPU 利用率非常高),那么您有 7 个消费者,每个消费者有 4 个 vCPU。您一次不能物理处理超过 28 (7 x 4) 条记录,因此 mapAsync 中的并行度不应超过 1;或者你需要更多和/或更大的实例
  • 如果工作是 I/O-bound 或以其他方式阻塞的,我会注意工作正在哪个线程池/执行上下文/Akka 调度程序上完成。所有这些通常只会产生有限数量的线程并在所有线程都忙时维护一个工作队列;该工作队列很可能是感兴趣的队列。扩展该池中的线程数量(或者如果使用默认的执行上下文或默认的 Akka 调度程序,将工作负载移动到适当大小的池中)将减少队列的压力
  • 由于您包含akka-http,因此消息的处理可能f涉及向其他服务发送 HTTP 请求。在这种情况下,重要的是要记住 Akka HTTP 为每个目标主机维护一个队列。目标端也可能有一个队列来控制那里的吞吐量。这在某种程度上是第二种(I/O 受限)情况的特例。

I/O 绑定/阻塞情况将通过您的实例上非常低的 CPU 利用率来证明。如果您正在为每个目标主机填充队列,您将看到有关“超出配置的最大打开请求值”的日志消息。

另一件值得注意的事情是,由于 Kafka 消费者本质上是阻塞的,所以 Alpakka Kafka 消费者参与者在他们自己的调度程序中运行,其大小默认为 16,这意味着每个主机一次最多只能有 16 个消费者或生产者工作. 至少将您的应用程序启动的消费者数量设置akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size为(每 7 个主题配置的 6 个消费者中的 42 个)可能是一个好主意。Alpakka Kafka 调度程序中的线程饥饿可能导致消费者重新平衡,这将破坏消费。

在不进行任何其他更改的情况下,我建议,为了跨分区更均匀的消耗率,设置

akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size = 42
consumer.parallelism = 50
于 2021-10-28T14:59:16.173 回答