2

当前用于 Actor 类的流实现的播放框架实现是在没有背压的情况下编写的。我想用背压actorRef替换actorRefWithAck. 但是演员流程是用我不熟悉的Scala编写的。任何人都可以解释如何用java编写这个类吗?

ActorFlow类如下:

object ActorFlow {
      /**
       * Create a flow that is handled by an actor.
       *
       * Messages can be sent downstream by sending them to the actor passed into the props function.  This actor meets
       * the contract of the actor returned by [[http://doc.akka.io/api/akka/current/index.html#akka.stream.scaladsl.Source$@actorRef[T](bufferSize:Int,overflowStrategy:akka.stream.OverflowStrategy):akka.stream.scaladsl.Source[T,akka.actor.ActorRef] akka.stream.scaladsl.Source.actorRef]].
       *
       * The props function should return the props for an actor to handle the flow. This actor will be created using the
       * passed in [[http://doc.akka.io/api/akka/current/index.html#akka.actor.ActorRefFactory akka.actor.ActorRefFactory]]. Each message received will be sent to the actor - there is no back pressure,
       * if the actor is unable to process the messages, they will queue up in the actors mailbox. The upstream can be
       * cancelled by the actor terminating itself.
       *
       * @param props A function that creates the props for actor to handle the flow.
       * @param bufferSize The maximum number of elements to buffer.
       * @param overflowStrategy The strategy for how to handle a buffer overflow.
       */
      def actorRef[In, Out](
          props: ActorRef => Props,
          bufferSize: Int = 16,
          overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
      )(implicit factory: ActorRefFactory, mat: Materializer): Flow[In, Out, _] = {
        val (outActor, publisher) = Source
          .actorRef[Out](bufferSize, overflowStrategy)
          .toMat(Sink.asPublisher(false))(Keep.both)
          .run()
    
        Flow.fromSinkAndSource(
          Sink.actorRef(
            factory.actorOf(Props(new Actor {
              val flowActor = context.watch(context.actorOf(props(outActor), "flowActor"))
    
              def receive = {
                case Status.Success(_) | Status.Failure(_) => flowActor ! PoisonPill
                case Terminated(_)                         => context.stop(self)
                case other                                 => flowActor ! other
              }
    
              override def supervisorStrategy = OneForOneStrategy() {
                case _ => SupervisorStrategy.Stop
              }
            })),
            Status.Success(())
          ),
          Source.fromPublisher(publisher)
        )
      }
}
4

0 回答 0