我正在尝试编写一个非常基本的应用程序,其中消息以更高的速率发送给参与者,参与者以较慢的速率消费消息,然后在一段时间后终止应用程序。当我使用相同的actor系统名称和相同的actor名称以及相同的persistenceId再次运行该应用程序时,我希望看到丢失的消息重播,但它没有发生。
(如果我删除日志和快照位置,它们会在下次运行时使用一些不是 0 字节大小的文件再次创建,所以肯定会发生一些事情。)
Edit1:每当我启动应用程序时,onReceiveRecover 都会收到 RecoveryCompleted 对象。
public class App {
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello World!");
ActorSystem actorSystem = ActorSystem.create("sample-actor-system");
ActorRef sampleActor = actorSystem.actorOf(
Props.create(AkkaWorker.class).withDispatcher(
"akka.actor.test-dispatcher"),
"sample-actor");
System.out.println(actorSystem.settings().config());
int i = 1;
//Run this coe the next time so that nothing is published, and only the replayed messages should be executed by the actor
// Thread.sleep(10000);
// System.exit(0);
while (true) {
String msg = "Hello there" + i;
sampleActor.tell(msg, null);
System.out.println("Published message: " + msg);
i++;
// break;
Thread.sleep(100);
if (i == 20) {
Thread.sleep(10000);
System.exit(0);
}
}
}
}
public class AkkaWorker extends UntypedPersistentActor {
public AkkaWorker() {
}
@Override
public String persistenceId() {
return "sample-id-1";
}
@Override
public void onReceiveCommand(Object message) throws Exception {
System.out.println("In onReceiveCommand");
// TODO Auto-generated method stub
if (message instanceof String) {
message = (String) message;
System.out.println("Received message: " + message);
if (((String) message).equalsIgnoreCase("suicide")) {
System.out.println("killing self");
getContext().stop(getSelf());
}
Thread.sleep(1000);
}
}
@Override
public void onReceiveRecover(Object message) {
System.out.println("In onReceiveRecover");
if (message instanceof String) {
System.out.println(message);
} else {
System.out.println("God knows what: "+ message.toString());
}
}
}
在 application.conf 中,
test-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
throughput = 1
}
persistence {
journal {
max-message-batch-size = 1
leveldb {
dir = "/Users/neeraj/akka-persist/journal"
native = true
}
}
snapshot-store.local.dir = "/Users/neeraj/akka-persist/snapshot"
}