我创建了ActorPublisher
一个源流。我SupervisionStrategy
为我的流的 Materializer 设置:
public class TestStream {
static class MyActorPublisher extends AbstractActorPublisher<String> {
public MyActorPublisher() {
receive(ReceiveBuilder
.match(ActorPublisherMessage.Request.class, request -> publish())
.match(ActorPublisherMessage.Cancel.class, cancel -> {
context().stop(self());
})
.build()
);
}
public void publish() {
if (totalDemand() > 0) {
throw new RuntimeException(" test exc ");
}
}
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(2, Duration.create(1, TimeUnit.MINUTES), param -> {
return SupervisorStrategy.restart();
});
}
}
public static void main(String[] args) {
final ActorSystem actorSystem = ActorSystem.create("actorSystem");
final ActorMaterializerSettings settings = ActorMaterializerSettings
.create(actorSystem)
.withInputBuffer(1, 1024)
.withSupervisionStrategy(JFunction.func((t) -> {
System.out.println("Supervisor found an error");
return Supervision.resume();
}));
final Materializer materializer = ActorMaterializer.create(settings, actorSystem);
final Source<String, ActorRef> benchmarkSource = Source.actorPublisher(
Props.create(MyActorPublisher.class)
);
benchmarkSource.map(item -> {
System.out.println(item);
return item + "1";
})
.to(Sink.foreach(item -> System.out.println(item)))
.run(materializer);
}}
但它从不使用 created 策略,何时ActorPublisher
抛出异常。我也尝试supervisorStrategy()
在MyActorPublisher
. 但我知道它只用于儿童演员,它不起作用。