我的问题与某处有关:访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef 有一些区别:
- 我正在使用 akka-stream 实验 1.0
- 我正在使用 actorPublisher 模型
- 我正在使用 FlowGraph dsl 进行并行处理的流定义
我找不到让 actorRef 向 Source 持有的 Actor Publisher 实例发送消息的方法。
def run(implicit system: ActorSystem) = { import system.dispatcher implicit val materializer = ActorMaterializer() val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event } //Implementation in subpackage val sinkLevel1 = Sinks.sinkLevel1 val sinkLevel2 = Sinks.sinkLevel2 //Implementation in subpackage val stageTriage = FlowStages.stageTriage val stageEvalProcess1 = FlowStages.stageEvalProcess1 val stageEvalProcess2 = FlowStages.stageEvalProcess2 val pipeline = FlowGraph.closed(){ implicit builder => import FlowGraph.Implicits._ val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2)) source ~> stageTriage ~> stageDispatchByRuleLevels stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1 stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2 } pipeline.run() }
感谢帮助 !
奥利弗