0

我在 Spring MVC Web 应用程序中有一个 Spring 服务,它调用一个 Actor 系统来计算一个值。当我在 webapp 上多次触发时,应用程序会启动一个TimeoutException. 只完成了第一个计算。

你能给我一些帮助吗?

谢谢

@Service
public class Service {

    public static final int processors = Runtime.getRuntime().availableProcessors();
    @Value("${Iterations}")
    long numberOfIterations;
    @Value("${constante}")
    double constante;

    ActorSystem system;
    ActorRef master;

    public Serice() {
        // Create an Akka system
        system = ActorSystem.create("ComputationSystem");

        // create the master
        master = system.actorOf(new Props(new UntypedActorFactory() {
            public UntypedActor create() {
                return new Master(constante);
            }
        }));
    }

    @PreDestroy
    public void cleanUp() throws Exception {
        system.shutdown();
    }

    @Override
    public double calculatePrice(double x, double y, double z,
            double ex) {


        // start the calculation
        Work work = new Work(numberOfIterations, x, y, z,
                ex);

        Timeout timeout = new Timeout(Duration.create(60, "seconds"));
        Future<Object> future = ask(master, work, timeout);

        double total = 0;
        try {
            total = (Double) Await.result(future,
                    timeout.duration());
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {

        }

        return total;
    }

}


public class Master extends UntypedActor {
    private final ActorRef workerRouter;
    private double total = 0;
    private int answerReceived = 0;
    private long nbPerThreads;
    private double x;
    private double constante;
    private ActorRef replayTo;

    public Master(final double constante) {
        workerRouter = this.getContext().actorOf(
                new Props(new UntypedActorFactory() {
                    public UntypedActor create() {
                        return new Worker(constante);
                    }
                }).withRouter(new RoundRobinRouter(Algo.processors)),
                "workerRouter");
        this.constante = constante;
    }

    public void onReceive(Object message) {
        if (message instanceof Work) {
            Work work = (Work) message;

            replayTo = getSender();
            nbPerThreads = work.nbIterations / Algo.processors;
            x = work.x / 360.0;

            // Modify the message to give the right to the workers
            work.nbIterations = nbPerThreads;
            work.x = x;

            for (int i = 0; i < Algo.processors; i++) {

                workerRouter.tell(work, getSelf());
            }
            return;
        }

        if (message instanceof Double) {


            Double result = (Double) message;
            total += result;
            if (++answerReceived == Algo.processors) {
                double meanOfPremiums = total / (nbPerThreads * Algo.processors); 

                double result = Math.exp(-constante * x) * meanOfPremiums;

                System.out.println("returning answer :" + message);
                // Return the answer
                replayTo.tell(result, getSelf());
            }
            return;
        }
        unhandled(message);
    }
}
4

3 回答 3

1

将发送者存储在属性中是有问题的。如果另一条 Work 消息在最后一条消息之前到达,这将被覆盖,您的消息将无法正确到达。我的建议是创建一个临时参与者来汇总结果并回复发件人。每次收到 Work 消息时,您都将创建此 Actor,并将它需要回复的发送者作为参数传递给它。当您将作品发送到您的工作路由器时,您只需将这个新演员作为发送者传递。您的工作人员代码没有更改。

这个新的参与者将简单地在其 onReceive 方法中保存您当前用于处理 Double 消息的代码,并context().system().stop(self())在将回复发送给原始发件人后调用。

这种模式应该会引导您找到一个可行的解决方案。

于 2013-10-24T12:42:41.373 回答
1

一切正常!谢谢大家,特别感谢@pushy。

我的代码中有 2 个错误。

首先,参考 replyTo 是原始发件人akka://ComputeSystem/temp/$0d。因此,当 2 个线程(http 调用)使用服务时,第一个未来会过期,因为主服务器从不发送响应。

其次,计算必须在临时 Actor 中完成。我创建了这个通知主人的演员。master 使用临时 Actor 引用启动所有工作程序。当所有工作人员都已回复此参与者时,计算将返回给原始发件人。

于 2013-10-25T09:43:43.880 回答
0

我对 Spring MVC 的工作原理一无所知,但是被Service实例化了多次?

如果是,则您正在以相同的名称多次创建参与者系统;这不是一个好主意。全局实例化它,然后获取对它及其创建的参与者的引用。

于 2013-10-24T10:20:53.427 回答