0

我正在通过ActorRefFactory创建子演员,以便我可以注入它 我正在为这些儿童演员使用一对一的监督策略,但是当他们抛出异常时,我无法在日志中看到监督 startegy 正在按照我的设置工作它到Resume

这是我的代码

val system = actorInjector.getInstance(classOf[ActorSystem])


val eventWriteActor = (f:ActorRefFactory) => f.actorOf(Props(new EventWriteActor(insertService)),"eventWriteActor")
  val actorsList:List[ActorRefFactory=>ActorRef]=List(eventWriteActor,eventQueryActor)
  val actorManager: ActorRef = system.actorOf(Props(new ActorManager(actorsList)), name = "ActorManager")

ActorManager.scala //父演员

class ActorManager(childMaker: List[ActorRefFactory => ActorRef]) extends Actor {
  val seconds = 10
  var eventWriteActor: Option[ActorRef] = None
  var eventQueryActor: Option[ActorRef] = None

  override val supervisorStrategy = OneForOneStrategy(
    maxNrOfRetries = seconds, withinTimeRange = seconds seconds) {
    case mex: MongoException =>
      log.error("Got some MongoException, Supervision Strategy says Resume exception is {}",mex)
      Resume
    case e: Exception =>
      log.error("Got some Exception, Supervision Strategy says Resume exception is {}",e)
      Resume
  }
  if (childMaker.nonEmpty) {
    eventWriteActor = Some(childMaker(0)(context))
    eventQueryActor = Some(childMaker(1)(context))
  }
  def receive: PartialFunction[Any, Unit] = {
    case InsertEvent(event) =>
      eventWriteActor match {
        case Some(writeActor)=>
          ask(writeActor, InsertEventInMongo(event)).pipeTo(sender)
        case None=>log.info("eventWriteActor is empty")
      }
}
}

儿童演员

class EventWriteActor @Inject() (insertEventServiceTrait:InsertEventServiceTrait) extends Actor {
def receive: PartialFunction[Any, Unit] = {
    case InsertEventInMongo(event) =>
      senderRef = Some(sender)
      insertEventServiceTrait.insertEventInMongo(event,senderRef)
  }
}

服务等级

class InsertEventService @Inject()(eventRepo: EventRepository) extends InsertEventServiceTrait {
override def insertEventInMongo(event: Event, senderRef: Option[ActorRef]): Unit = {
    eventRepository.insertEvent(event: Event, senderRef: Option[ActorRef])
  }
}

存储库类

class EventRepositoryImpl @Inject()(mongodb: MongoFactoryTrait) extends EventRepository {
  val eventInsert = new EventInsert(mongodb)
  override def insertEvent(event: Event, senderRef: Option[akka.actor.ActorRef]): Unit = {
    eventInsert.eventInsert(event: Event, senderRef: Option[akka.actor.ActorRef])
  }
}

领域类

class EventInsert(mongoFactoryTrait: MongoFactoryTrait) {
def eventInsert(event: Event, senderRef: Option[akka.actor.ActorRef]):Unit = {
    var isInserted: Boolean = false
    val document = insertDocument(event)
    val collection: MongoCollection[Document] = mongoFactoryTrait.collectionMongo(Event_COLLECTION_NAME)
    val insertionResult: SingleObservable[Completed] = collection.insertOne(document)

    insertionResult.subscribe(new Observer[Completed] {

      override def onNext(result: Completed): Unit = {
        log.info("insertEvent: Event successfully inserted")
        isInserted = true
      }

      override def onError(e: Throwable) = {
        isInserted = false
        e match {
          case mongoEx: MongoException =>
            log.error("insertEvent: Mongoexception in inserting Event", mongoEx)
            senderRef match {
              case Some(ref: ActorRef) =>
                ref ! akka.actor.Status.Failure(mongoEx)
              case None => log.warn("insertEvent: Actor reference is null")
            }
          case e: Exception =>
            isInserted = false
            log.debug("insertEvent: Event record not inserted, isInserted ={}", isInserted)
            senderRef match {
              case Some(ref: ActorRef) =>
                ref ! akka.actor.Status.Failure(e)
              case None => log.warn("insertEvent: Actor reference is null")
            }
        }
      }

      override def onComplete(): Unit = {
        isInserted = true
        
        senderRef match {
          case Some(ref: ActorRef) =>
            ref ! isInserted
          case None => log.warn("insertEvent: Actor reference is null")
        }
      }
    })
  }

我正在添加一个具有相同密钥的文档,以便代码抛出 MongoDublicateKey 异常,我可以测试代码,但这里是我得到的日志,它没有显示演员中的主管演员

17:27:30.500 9358 [InnocuousThread-7] operation DEBUG - Unable to retry operation INSERT due to error "com.mongodb.MongoBulkWriteException: Bulk write operation error on server localhost:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: myprojectdb.Event index: _id_ dup key: { _id: "123" }', details={}}]. "
17:27:30.505 9363 [InnocuousThread-7] EventInsert ERROR - insertEvent: Mongoexception in inserting Event
com.mongodb.MongoWriteException: E11000 duplicate key error collection: myprojectdb.Event index: _id_ dup key: { _id: "123" }
    at com.mongodb.async.client.MongoCollectionImpl$10.onResult(MongoCollectionImpl.java:1140)
    at com.mongodb.async.client.MongoCollectionImpl$10.onResult(MongoCollectionImpl.java:1127)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.async.client.OperationExecutorImpl$2$1$1.onResult(OperationExecutorImpl.java:140)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.operation.OperationHelper$ConnectionReleasingWrappedCallback.onResult(OperationHelper.java:432)
    at com.mongodb.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:525)
    at com.mongodb.operation.MixedBulkWriteOperation.access$1600(MixedBulkWriteOperation.java:72)
    at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:507)
    at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:479)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:253)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
    at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:560)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137)
    at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105)
    at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511)
    at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:437)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
    at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306)
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    at java.base/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:134)
4

0 回答 0