1

我正在尝试编写一个非常基本的应用程序,其中消息以更高的速率发送给参与者,参与者以较慢的速率消费消息,然后在一段时间后终止应用程序。当我使用相同的actor系统名称和相同的actor名称以及相同的persistenceId再次运行该应用程序时,我希望看到丢失的消息重播,但它没有发生。

(如果我删除日志和快照位置,它们会在下次运行时使用一些不是 0 字节大小的文件再次创建,所以肯定会发生一些事情。)

Edit1:每当我启动应用程序时,onReceiveRecover 都会收到 RecoveryCompleted 对象。

public class App {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello World!");
        ActorSystem actorSystem = ActorSystem.create("sample-actor-system");
        ActorRef sampleActor = actorSystem.actorOf(
                Props.create(AkkaWorker.class).withDispatcher(
                        "akka.actor.test-dispatcher"),
                "sample-actor");
        System.out.println(actorSystem.settings().config());
        int i = 1;
//Run this coe the next time so that nothing is published, and only the replayed messages should be executed by the actor
//        Thread.sleep(10000);
//        System.exit(0);
        while (true) {
            String msg = "Hello there" + i;
            sampleActor.tell(msg, null);
            System.out.println("Published message: " + msg);
            i++;
            // break;
            Thread.sleep(100);
            if (i == 20) {
                Thread.sleep(10000);
                System.exit(0);
            }
        }
    }
}


public class AkkaWorker extends UntypedPersistentActor {

    public AkkaWorker() {

    }

    @Override
    public String persistenceId() {
        return "sample-id-1";
    }

    @Override
    public void onReceiveCommand(Object message) throws Exception {
        System.out.println("In onReceiveCommand");
        // TODO Auto-generated method stub
        if (message instanceof String) {
            message = (String) message;
            System.out.println("Received message: " + message);
            if (((String) message).equalsIgnoreCase("suicide")) {
                System.out.println("killing self");
                getContext().stop(getSelf());
            }

            Thread.sleep(1000);
        }
    }

    @Override
    public void onReceiveRecover(Object message) {
        System.out.println("In onReceiveRecover");
        if (message instanceof String) {
            System.out.println(message);
        } else {
            System.out.println("God knows what: "+ message.toString());
        }
    }

}

在 application.conf 中,

test-dispatcher {
      type = Dispatcher
      executor = "fork-join-executor"
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
      throughput = 1
    }
persistence {
    journal {
      max-message-batch-size = 1
      leveldb {
        dir = "/Users/neeraj/akka-persist/journal"
        native = true
      }
    }
    snapshot-store.local.dir = "/Users/neeraj/akka-persist/snapshot"
  }
4

0 回答 0