1

我有一个 kafka 消费者,如下所示:

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {


    implicit val system = ActorSystem("SAP-SENDER")
    implicit val executor = system.dispatcher
    implicit val materilizer = ActorMaterializer()

    val config = system.settings.config.getConfig("akka.kafka.consumer")

    val consumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers("localhost:9003")
        .withGroupId("SAPSENDER")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.topics("TEST-TOPIC")
      )
      .runWith(Sink.foreach(println))
      .onComplete{
        case Success(_) => println("Goood")
        case Failure(ex) =>
          println(s"I am failed ==============> ${ex.getMessage}")
          system.terminate()
      }

  }
} 

kafka 服务器未激活,我只想终止消费者。它总是尝试连接并显示以下消息:

19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
19:03:47.478 [SAP-SENDER-akka.kafka.default-dispatcher-20] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []   

它还说:

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:173)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:515)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:380)
    at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:360)
    at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:221)
    at akka.actor.Actor.aroundReceive(Actor.scala:539)
    at akka.actor.Actor.aroundReceive$(Actor.scala:537)
    at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.Timers.aroundReceive(Timers.scala:51)
    at akka.actor.Timers.aroundReceive$(Timers.scala:40)
    at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
    at akka.actor.ActorCell.invoke(ActorCell.scala:579)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)  

如何赶上ConnectException流中的内容并阻止消费者尝试连接 kafka。

代码托管在这里https://gitlab.com/akka-samples/kafkaconsumer

4

3 回答 3

2

使用 Kafka Client 2.0+ Alpakka Kafka 无法注意到给定地址上没有可用的 Kafka 代理。

https://github.com/akka/alpakka-kafka/issues/674

于 2019-04-26T15:01:33.493 回答
1

看看这个PR和升级到 kafka 客户端 2.0 的工作,我猜想很多重试责任已经委托给了 kafka 客户端。例如,我尝试传递这些属性

val consumerSettings: ConsumerSettings[String, String] =
  ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
    .withProperties(
      "reconnect.backoff.ms" -> "10000",
      "reconnect.backoff.max.ms" -> "20000"
    )
    .withBootstrapServers("localhost:9099")
    .withGroupId("SAPSENDER")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

并且异常在 10 秒后第二次出现。我在这里找到了这些属性

鉴于此,我认为 kafka 客户端的新适配可能缺少一个功能,因为 KafkaConsumerActor 不会将异常暴露给流,我使用您的存储库尝试了各种组合,但我仍然得到连续的调试消息流。

我希望这能给正确的方向一些提示,如果你解决了,请告诉我们。

于 2019-04-22T20:42:45.333 回答
0

您应该监控您的流并在出现错误时重新启动它。例如,您可以在演员内部运行您的流,并通过演员监督处理错误连接。

连接错误可能会持续几秒钟(可能网络不堪重负),因此您应该使用退避策略来避免重试风暴。

Akka 流已经为您提供了一种使用RestartSource. 请参阅错误处理

val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)

val result = RestartSource
  .onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () =>
    Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      // this is a hack to get access to the Consumer.Control
      // instances of the latest Kafka Consumer source
      .mapMaterializedValue(c => control.set(c))
      .via(businessFlow)
  }
  .runWith(Sink.seq)

control.get().shutdown()

此解决方案仅在您启动流并且代理关闭时才有效,因为当您尝试创建它时消费者会抛出异常。但是,如果您成功创建了消费者,然后整个 kafka 集群崩溃,内部 KafkaConsumer 将使用上述配置重新连接,reconnect.backoff.ms并且reconnect.backoff.max.ms您的流不会失败。

如果您想限制退休人数,您应该执行以下操作

val result: Future[Done] = RestartSource
  .onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () => // your consumer  
  }.
  .take(3) // retries limit
  .runWith(Sink.ignore)

result.onComplete {
  case _ => println("Max retries reached")
}
于 2019-04-26T15:07:22.847 回答