我正在尝试为 Akka实现 Java聚合器,因为它看起来不像Java API 支持它们(为什么不!?)
到目前为止,这是我最好的尝试:
// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
ActorRef recipient
Set<T> aggregation
// TODO: Timer timer (?)
abstract boolean isAggregated()
@Override
void onReceive(Object message) {
aggregation << message as T
if(isAggregated()) {
recipient.tell(new Aggregation(aggregation)) // again, pseudo-code
aggregation.clear()
// TODO: timer.reset()
}
}
}
缺少的是某种Timer
构造,Aggregator
如果它尚未聚合,它将在 60 秒后超时。超时时,它应该抛出某种异常。在聚合时,应重置计时器。任何想法如何做到这一点?