我们正面临这样一种情况,即我们的 akka-stream-kaka-consumer 处理率在出现延迟时会下降。当我们在分区中没有任何延迟的情况下启动它时,处理速度会突然增加。
MSK 集群 - 10 个主题 - 每个 40 个分区 => 400 个总领导分区
为了在系统中实现高吞吐量和并行性,我们实现了 akka-stream-kafka 消费者,分别订阅每个主题分区,从而在消费者和分区之间实现 1:1 映射。
这是消费者设置:
- ec2 服务实例数 - 7
- 每个服务为 10 个主题中的每个主题启动 6 个消费者,从而导致每个服务实例有 60 个消费者。
- 总消费者 = 实例数 (7) * 每个服务实例上的消费者数 (60) = 420
因此,我们总共启动了 420 个消费者,分布在不同的实例中。根据 RangeAssignor 分区策略(默认一个),每个分区将分配给不同的消费者,400 个消费者将使用 400 个分区,20 个消费者将保持未使用状态。我们已经验证了这个分配并且看起来不错。
使用的实例类型: c5.xlarge
MSK 配置:
阿帕奇卡夫卡版本- 2.4.1.1
经纪人总数- 9(分布在 3 个可用区)
经纪人类型: kafka.m5.large
每个区域的经纪人: 3
auto.create.topics.enable =真
default.replication.factor =3
min.insync.replicas =2
num.io.threads =8
num.network.threads = 5
分区数=40
num.replica.fetchers =2
副本.lag.time.max.ms = 30000
socket.receive.buffer.bytes = 102400
socket.request.max.bytes =104857600
socket.send.buffer.bytes = 102400
unclean.leader.election.enable =真
zookeeper.session.timeout.ms = 18000
log.retention.ms = 259200000
这是我们为每个消费者使用的配置
akka.kafka.consumer {
kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = "consumer1"
group.id = "consumer1"
auto.offset.reset="latest"
}
aws.glue.registry.name="Registry1"
aws.glue.avroRecordType = "GENERIC_RECORD"
aws.glue.region = "region"
kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"
# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
#Flag to turn on connection checker
enable = true
# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3
# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s
# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}
}
akka.kafka.committer {
# Maximum number of messages in a single commit batch
max-batch = 10000
# Maximum interval between commits
max-interval = 5s
# Parallelism for async committing
parallelism = 1500
# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
# API may change.
# Controls when a `Committable` message is queued to be committed.
# OffsetFirstObserved: When the offset of a message has been successfully produced.
# NextOffsetObserved: When the next offset is observed.
when = OffsetFirstObserved
}
akka.http {
client {
idle-timeout = 10s
}
host-connection-pool {
idle-timeout = 10s
client {
idle-timeout = 10s
}
}
}
consumer.parallelism=1500
我们正在使用下面的代码来实现从 Kafka 到空接收器的流程
override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")
val supervisionDecider: Supervision.Decider = {
case _ => Supervision.Resume
}
val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicsName))
.mapAsync(parallelism) {
msg =>
f(msg.record.key(), msg.record.value())
.map(_ => msg.committableOffset)
.recoverWith {
case _ => Future.successful(msg.committableOffset)
}
}
.toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
.withAttributes(supervisionStrategy)
代码中的库版本
"com.typesafe.akka" %% "akka-http" % "10.1.11",
"com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
"com.typesafe.akka" %% "akka-stream" % "2.5.30"
观察如下,
- 假设在 1 小时的连续间隔中,只有一些消费者
以预期的速度积极地消耗延迟和处理。 - 在接下来的 1 小时内,其他一些消费者变得活跃并
从其分区中主动消费,然后停止处理。 - 从 offsetLag 图表中观察到,所有滞后都将在一次拍摄中清除。
我们希望所有消费者并行运行并实时处理消息。这 3 天的处理延迟给我们造成了很大的停机时间。我尝试按照给定的链接,但我们已经在固定版本 https://github.com/akka/alpakka-kafka/issues/549
任何人都可以帮助我们在消费者配置或其他问题方面缺少什么。