3

我创建了ActorPublisher一个源流。我SupervisionStrategy为我的流的 Materializer 设置:

public class TestStream {

static class MyActorPublisher extends AbstractActorPublisher<String> {

    public MyActorPublisher() {
        receive(ReceiveBuilder
                        .match(ActorPublisherMessage.Request.class, request -> publish())
                        .match(ActorPublisherMessage.Cancel.class, cancel -> {
                            context().stop(self());
                        })
                        .build()
        );
    }

    public void publish() {
        if (totalDemand() > 0) {
            throw new RuntimeException(" test exc ");
        }
    }


    @Override
    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(2, Duration.create(1, TimeUnit.MINUTES), param -> {
            return SupervisorStrategy.restart();
        });
    }

}

public static void main(String[] args) {
    final ActorSystem actorSystem = ActorSystem.create("actorSystem");
    final ActorMaterializerSettings settings = ActorMaterializerSettings
            .create(actorSystem)
            .withInputBuffer(1, 1024)
            .withSupervisionStrategy(JFunction.func((t) -> {
                System.out.println("Supervisor found an error");
                return Supervision.resume();
            }));
    final Materializer materializer = ActorMaterializer.create(settings, actorSystem);

    final Source<String, ActorRef> benchmarkSource = Source.actorPublisher(
            Props.create(MyActorPublisher.class)
    );

    benchmarkSource.map(item -> {
        System.out.println(item);
        return item + "1";
    })
            .to(Sink.foreach(item -> System.out.println(item)))
            .run(materializer);
}}

但它从不使用 created 策略,何时ActorPublisher抛出异常。我也尝试supervisorStrategy()MyActorPublisher. 但我知道它只用于儿童演员,它不起作用。

4

0 回答 0