1

我的问题与某处有关:访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef 有一些区别:

  1. 我正在使用 akka-stream 实验 1.0
  2. 我正在使用 actorPublisher 模型
  3. 我正在使用 FlowGraph dsl 进行并行处理的流定义

我找不到让 actorRef 向 Source 持有的 Actor Publisher 实例发送消息的方法。

 def run(implicit system: ActorSystem) = {
   import system.dispatcher
   implicit val materializer = ActorMaterializer()

   val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event }

   //Implementation in subpackage
   val sinkLevel1 = Sinks.sinkLevel1 
   val sinkLevel2 = Sinks.sinkLevel2

   //Implementation in subpackage
   val stageTriage = FlowStages.stageTriage    
   val stageEvalProcess1 = FlowStages.stageEvalProcess1
   val stageEvalProcess2 = FlowStages.stageEvalProcess2

   val pipeline = FlowGraph.closed(){ implicit builder => 
     import FlowGraph.Implicits._

     val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2))

     source ~> stageTriage ~> stageDispatchByRuleLevels
                              stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1
                              stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2

   }

   pipeline.run()

 }

感谢帮助 !

奥利弗

4

1 回答 1

1

根据链接问题中诺亚的回答,如果您添加

val ref = pipeline.run()

然后,您可以向 ref 发送消息,例如

ref ! ...
于 2015-09-12T17:08:19.733 回答