0

我正在运行一个 Akka Streams Kafka 应用程序,我想在流消费者上合并监督策略,这样如果代理出现故障,并且流消费者在停止超时后死亡,主管可以重新启动消费者。

这是我的完整代码:

UserEventStream

import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer

class UserEventStream extends Actor {

  val settings = Settings(context.system).KafkaConsumers
  implicit val timeout: Timeout = Timeout(10 seconds)
  implicit val materializer = ActorMaterializer()

  override def preStart(): Unit = {
    super.preStart()
    println("Starting UserEventStream....s")
  }
  override def receive = {
    case "start" =>
      val consumerConfig = settings.KafkaConsumerInfo
      println(s"ConsumerConfig with $consumerConfig")
      startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
  }

  def startStreamConsumer(config: Map[String, String]) = {
    println(s"startStreamConsumer with config $config")

    val consumerSource = createConsumerSource(config)
    val consumerSink = createConsumerSink()
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")

    println("START: The UserEventStream processing")
    val future =
      consumerSource
        .mapAsync(parallelism = 50) { message =>
          val m = s"${message.record.value()}"
          messageProcessor ? m
        }
        .runWith(consumerSink)
    future.onComplete {
      case Failure(ex) =>
        println("FAILURE : The UserEventStream processing, stopping the actor.")
        self ! PoisonPill
      case Success(ex) =>
    }
  }

  def createConsumerSource(config: Map[String, String]) = {
    val kafkaMBAddress = config("bootstrap-servers")
    val groupID = config("groupId")
    val topicSubscription = config("subscription-topic").split(',').toList
    println(s"Subscriptiontopics $topicSubscription")

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
  }

  def createConsumerSink() = {
    Sink.foreach(println)
  }
}  

StreamProcessorSupervisor(这是班级的主管UserEventStream班):

import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._

object StreamProcessorSupervisor {
  final case object StartSimulator
  final case class StartClient(id: String)
  def props(implicit materializer: ActorMaterializer) =
    Props(classOf[StreamProcessorSupervisor], materializer)
}

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props(classOf[UserEventStream])
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          childProps,
          childName = "usereventstream",
          minBackoff = 1.second,
          maxBackoff = 1.minutes,
          randomFactor = 0.2
        )
      )
      context.actorOf(supervisor, name = s"$id-backoff-supervisor")
      val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
      userEventStrean ! "start"
  }
}

App(主要应用类):

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer

object App extends App {

  implicit val system = ActorSystem("stream-test")
  implicit val materializer = ActorMaterializer()

  system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}

application.conf

kafka {

  consumer {

    num-consumers = "1"
    c1 {
      bootstrap-servers = "localhost:9092"
      bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
      groupId = "localakkagroup1"
      subscription-topic = "test"
      subscription-topic = ${?SUBSCRIPTION_TOPIC1}
      message-type = "UserEventMessage"
      poll-interval = 50ms
      poll-timeout = 50ms
      stop-timeout = 30s
      close-timeout = 20s
      commit-timeout = 15s
      wakeup-timeout = 10s
      max-wakeups = 10
      use-dispatcher = "akka.kafka.default-dispatcher"
      kafka-clients {
        enable.auto.commit = true
      }
    }
  }
}

运行应用程序后,我故意杀死了 Kafka 代理,然后发现 30 秒后,演员正在通过发送毒丸来停止自己。但奇怪的是,它并没有像BackoffSupervisor策略中提到的那样重新启动。

这里可能是什么问题?

4

1 回答 1

0

您的代码中有两个实例UserEventStream:一个是BackoffSupervisor内部创建的子角色Props,您传递给它,另一个val userEventStreanStreamProcessorSupervisor. "start"当您应该将该消息发送给前者时,您正在将消息发送给后者。

您不需要val userEventStrean,因为BackoffSupervisor创建了子演员。发送给 的消息BackoffSupervisor会转发给孩子,因此要向孩子发送"start"消息,请将其发送到BackoffSupervisor

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props[UserEventStream]
      val supervisorProps = BackoffSupervisor.props(...)
      val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
      supervisor ! "start"
  }
}

另一个问题是,当一个演员收到一个 时PoisonPill,这与那个演员抛出异常不同。因此,Backoff.onFailureUserEventStream向自身发送一个PoisonPill. APoisonPill停止演员,因此请Backoff.onStop改用:

val supervisorProps = BackoffSupervisor.props(
  Backoff.onStop( // <--- use onStop
    childProps,
    ...
  )
)
val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
supervisor ! "start"
于 2017-10-06T13:10:42.310 回答