5

我正在尝试构建和运行一个 akka 流(在 Java DSL 中),其中有 2 个演员作为源,然后是一个合并结点,然后是 1 个接收器:

    Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.backpressure());
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.backpressure());
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));

    RunnableFlow<BoxedUnit> closed = FlowGraph.factory().closed(sink, (b, out) -> {
        UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
        b.from(src1).via(merge).to(out);
        b.from(src2).to(merge);
    });

    closed.run(mat);

我的问题是如何获取对源演员的 ActorRef 引用以便向他们发送消息?如果有 1 个演员,我不会使用图形生成器,然后 .run() 或 runWith() 方法将返回 ActorRef 对象。但是如果有很多源演员怎么办?甚至有可能实现这样的流程吗?

4

1 回答 1

6

回答我自己的问题以防有人需要。

使用 jrudolph 的建议,我能够使用这样的演员(在实际代码中,我做了比 2 个 ActorRefs 列表更好的事情):

    Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.fail());
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.fail());
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));

    RunnableFlow<List<ActorRef>> closed = FlowGraph.factory().closed(src1, src2, (a1, a2) -> Arrays.asList(a1, a2), (b, s1, s2) -> {
        UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
        b.from(s1).via(merge).to(sink);
        b.from(s2).to(merge);
    });

    List<ActorRef> stream = closed.run(mat);
    ActorRef a1 = stream.get(0);
    ActorRef a2 = stream.get(1);
于 2015-05-10T13:25:46.527 回答