21

我有一个 java 对象,它不是使用 actorSelection(Path)) 从演员系统中选择演员的演员。系统中可能不存在所选参与者。

在 Java Api 中,ActorSelection 不存在 ask(),因此我无法向 Actor 选择发送和识别消息并使用响应的发送者。

我试图通过演员选择将消息发送给演员然后对死信作出反应来解决问题。但我没有收到任何死信。

如果演员还活着或不存在,我如何检查 ActorSelection?

ActorSystem system = ActorSystem.create("test");

//create test actor
system.actorOf(Props.create(TestActor.class), "testActor");

//add dead letter listener to the system
ActorRef eventBusActor = asys.actorOf(Props.create(EventBusActor.class), "eventbusactor");
system.eventStream().subscribe(eventBusActor, DeadLetter.class);


//This works. The test actor receives the message      
ActorSelection a1 = asys.actorSelection("/user/testActor");
a1.tell("hello", ActorRef.noSender());

//This does not work and does not send dead letters      
ActorSelection a2 = asys.actorSelection("/user/doesnotexist");
a2.tell("hello", ActorRef.noSender());

//Does not compile, because ask needs an ActorRef as first argument
ActorSelection a3 = asys.actorSelection("/user/test");
Future f = Patterns.ask(a3, new Identify(), 1000);
4

5 回答 5

25

我最近发现了 ActorSelection.resolveOne 方法:

val name = "myActor"
implicit val timeout = 5000 // Timeout for the resolveOne call
system.actorSelection(name).resolveOne().onComplete {
  case Success(actor) => actor ! message

  case Failure(ex) =>
    val actor = system.actorOf(Props(classOf[ActorClass]), name)
    actor ! message
}

我仍在调查的一个问题是,定义它的方法可能会被同时调用(来自其他参与者)。因此,如果 resolveOne 调用失败,则可能会出现竞争条件,您尝试创建两次 actor,因为该 actor 仍在创建中。对于您的用例,这可能是也可能不是问题

于 2013-11-07T10:51:13.543 回答
13

It looks like Akka left off support for ActorSelection on the java api for ask. I played with the code a little and I found something that works though. See if this code works for you:

import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.Future;

import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.Props;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;

public class AskTest {

  public static void main(String[] args) throws Exception{
    ActorSystem sys = ActorSystem.apply("test");
    sys.actorOf(Props.create(TestActor.class), "mytest");

    ActorSelection sel = sys.actorSelection("/user/mytest");

    Timeout t = new Timeout(5, TimeUnit.SECONDS);
    AskableActorSelection asker = new AskableActorSelection(sel);
    Future<Object> fut = asker.ask(new Identify(1), t);
    ActorIdentity ident = (ActorIdentity)Await.result(fut, t.duration());
    ActorRef ref = ident.getRef();
    System.out.println(ref == null);
  }
}

I just looked at how the scala ask support worked and hooked into it via java. This worked for me; I'm hoping it works for you.

于 2013-08-02T12:00:49.683 回答
7

Akka 提供了ActorRef一个ActorSelection使用特殊消息来获取消息的功能Identify。您不必使用ask()此消息。只需将识别消息传递给 ActorSelection 并收听ActorIdentity将传回给您的消息。在 Akka 文档中有一个关于此的示例:通过 Actor Selection (Java) 识别 Actor

此代码取自示例并进行了修改:

final String identifyId = "1";

@Override
public void onReceive(Object message) {
    if (message instanceof ActorIdentity) {
        ActorIdentity identity = (ActorIdentity) message; 
        if (identity.correlationId().equals(identifyId)) {
            ActorRef ref = identity.getRef();
            if (ref == null)
                // Actor does not exist
            else {
                // Actor does exist
            }
        }
     }
}

还有一个非常漂亮的图形显示了文档中的 ActorPath、ActorSelection 和 Actor 生命周期之间的关系。

于 2013-08-02T11:20:06.257 回答
4

正如其他答案所指出的,ActorSelection.resolveOne()处理这个。

一个警告:在后台,这是通过向相关参与者发送消息来实现的。这意味着如果那个演员很忙,它不会回复,这会失败(超时)。

在纯粹的最佳实践 Akka 中,这可能是一个极端情况。在更混合的普通 Java / Akka 设置中,虽然很容易陷入困境。特别是,演员线程中的代码无法找到对该演员的引用。

于 2014-10-23T14:50:35.260 回答
1

使用版本 2.3.4

一些 Scala 示例,也许可以提供帮助

  val zed2 = Akka.system().actorSelection("path")
  val fs:FiniteDuration = (100).millis

  val x = zed2.resolveOne(fs).value
  if (x.isDefined){
    println(x.get.isFailure)
  }
于 2015-07-30T23:28:08.603 回答