我正在通过ActorRefFactory创建子演员,以便我可以注入它 我正在为这些儿童演员使用一对一的监督策略,但是当他们抛出异常时,我无法在日志中看到监督 startegy 正在按照我的设置工作它到Resume
这是我的代码
val system = actorInjector.getInstance(classOf[ActorSystem])
val eventWriteActor = (f:ActorRefFactory) => f.actorOf(Props(new EventWriteActor(insertService)),"eventWriteActor")
val actorsList:List[ActorRefFactory=>ActorRef]=List(eventWriteActor,eventQueryActor)
val actorManager: ActorRef = system.actorOf(Props(new ActorManager(actorsList)), name = "ActorManager")
ActorManager.scala //父演员
class ActorManager(childMaker: List[ActorRefFactory => ActorRef]) extends Actor {
val seconds = 10
var eventWriteActor: Option[ActorRef] = None
var eventQueryActor: Option[ActorRef] = None
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = seconds, withinTimeRange = seconds seconds) {
case mex: MongoException =>
log.error("Got some MongoException, Supervision Strategy says Resume exception is {}",mex)
Resume
case e: Exception =>
log.error("Got some Exception, Supervision Strategy says Resume exception is {}",e)
Resume
}
if (childMaker.nonEmpty) {
eventWriteActor = Some(childMaker(0)(context))
eventQueryActor = Some(childMaker(1)(context))
}
def receive: PartialFunction[Any, Unit] = {
case InsertEvent(event) =>
eventWriteActor match {
case Some(writeActor)=>
ask(writeActor, InsertEventInMongo(event)).pipeTo(sender)
case None=>log.info("eventWriteActor is empty")
}
}
}
儿童演员
class EventWriteActor @Inject() (insertEventServiceTrait:InsertEventServiceTrait) extends Actor {
def receive: PartialFunction[Any, Unit] = {
case InsertEventInMongo(event) =>
senderRef = Some(sender)
insertEventServiceTrait.insertEventInMongo(event,senderRef)
}
}
服务等级
class InsertEventService @Inject()(eventRepo: EventRepository) extends InsertEventServiceTrait {
override def insertEventInMongo(event: Event, senderRef: Option[ActorRef]): Unit = {
eventRepository.insertEvent(event: Event, senderRef: Option[ActorRef])
}
}
存储库类
class EventRepositoryImpl @Inject()(mongodb: MongoFactoryTrait) extends EventRepository {
val eventInsert = new EventInsert(mongodb)
override def insertEvent(event: Event, senderRef: Option[akka.actor.ActorRef]): Unit = {
eventInsert.eventInsert(event: Event, senderRef: Option[akka.actor.ActorRef])
}
}
领域类
class EventInsert(mongoFactoryTrait: MongoFactoryTrait) {
def eventInsert(event: Event, senderRef: Option[akka.actor.ActorRef]):Unit = {
var isInserted: Boolean = false
val document = insertDocument(event)
val collection: MongoCollection[Document] = mongoFactoryTrait.collectionMongo(Event_COLLECTION_NAME)
val insertionResult: SingleObservable[Completed] = collection.insertOne(document)
insertionResult.subscribe(new Observer[Completed] {
override def onNext(result: Completed): Unit = {
log.info("insertEvent: Event successfully inserted")
isInserted = true
}
override def onError(e: Throwable) = {
isInserted = false
e match {
case mongoEx: MongoException =>
log.error("insertEvent: Mongoexception in inserting Event", mongoEx)
senderRef match {
case Some(ref: ActorRef) =>
ref ! akka.actor.Status.Failure(mongoEx)
case None => log.warn("insertEvent: Actor reference is null")
}
case e: Exception =>
isInserted = false
log.debug("insertEvent: Event record not inserted, isInserted ={}", isInserted)
senderRef match {
case Some(ref: ActorRef) =>
ref ! akka.actor.Status.Failure(e)
case None => log.warn("insertEvent: Actor reference is null")
}
}
}
override def onComplete(): Unit = {
isInserted = true
senderRef match {
case Some(ref: ActorRef) =>
ref ! isInserted
case None => log.warn("insertEvent: Actor reference is null")
}
}
})
}
我正在添加一个具有相同密钥的文档,以便代码抛出 MongoDublicateKey 异常,我可以测试代码,但这里是我得到的日志,它没有显示演员中的主管演员
17:27:30.500 9358 [InnocuousThread-7] operation DEBUG - Unable to retry operation INSERT due to error "com.mongodb.MongoBulkWriteException: Bulk write operation error on server localhost:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: myprojectdb.Event index: _id_ dup key: { _id: "123" }', details={}}]. "
17:27:30.505 9363 [InnocuousThread-7] EventInsert ERROR - insertEvent: Mongoexception in inserting Event
com.mongodb.MongoWriteException: E11000 duplicate key error collection: myprojectdb.Event index: _id_ dup key: { _id: "123" }
at com.mongodb.async.client.MongoCollectionImpl$10.onResult(MongoCollectionImpl.java:1140)
at com.mongodb.async.client.MongoCollectionImpl$10.onResult(MongoCollectionImpl.java:1127)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.async.client.OperationExecutorImpl$2$1$1.onResult(OperationExecutorImpl.java:140)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.operation.OperationHelper$ConnectionReleasingWrappedCallback.onResult(OperationHelper.java:432)
at com.mongodb.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:525)
at com.mongodb.operation.MixedBulkWriteOperation.access$1600(MixedBulkWriteOperation.java:72)
at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:507)
at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:479)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:253)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:560)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105)
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511)
at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:437)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
at java.base/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:134)