1

我正在尝试为 Akka实现 Java聚合器,因为它看起来不像Java API 支持它们(为什么不!?

到目前为止,这是我最好的尝试:

// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
    ActorRef recipient
    Set<T> aggregation
    // TODO: Timer timer (?)

    abstract boolean isAggregated()

    @Override
    void onReceive(Object message) {
        aggregation << message as T

        if(isAggregated()) {
            recipient.tell(new Aggregation(aggregation))    // again, pseudo-code
            aggregation.clear()
            // TODO: timer.reset()
        }
    }
}

缺少的是某种Timer构造,Aggregator如果它尚未聚合,它将在 60 秒后超时。超时时,它应该抛出某种异常。在聚合时,应重置计时器。任何想法如何做到这一点?

4

1 回答 1

1

您正在寻找的是ReceiveTimeout. Akka 提供了一个功能,当特定的 actor 在预定义的时间内没有收到任何东西时,它会有一个超时。

在 Java 中,你会在你的 actor 内部做这样的事情:

getContext().setReceiveTimeout(Duration.create("1 second"));

当这个触发器发送一个类型的消息ReceiveTimeout给actor,然后你可以决定你想要做什么(异常,日志记录,重置......)。

您可以在“接收超时”部分下找到更多信息:http: //doc.akka.io/docs/akka/snapshot/java/untyped-actors.html

另一方面,在 github 上有一些开源库可以做这些事情。查看https://github.com/sksamuel/akka-patterns了解更多示例。

于 2015-06-19T18:25:46.447 回答