这是我的场景:
我有一个主actor,它接收来自多个子actor的消息。这些消息包含要聚合的数据。在这个聚合逻辑中,如果我使用共享数据结构来收集聚合,我是否需要处理同步问题?
else if(arg0 instanceof ReducedMsg){
ReducedMsg reduced = (ReducedMsg)arg0;
counter.decrementAndGet();
synchronized(finalResult){
finalResult.add((KeyValue<K, V>) reduced.getReduced());
if(counter.get() == 0){
if(checkAndReduce(finalResult)){
finalResult.clear();
}
else{
stop();
latch.countDown();
}
}
}
}
如您所见,我有一个 finalResult,每条消息都将汇总到该结果中,并且在处理逻辑之后,还需要清除集合。
实际上我正在尝试实现的是递归(关联)减少mapreduce。所以我需要保持我假设的同步块?或者 Akka 是否有机会一次执行 onReceive 一个线程?
这种逻辑在小数据集上产生准确和可预测的结果。我的问题是当我的输入数据集有点大时,代码会挂起。我想确定这是因为我的同步块的上下文切换,所以我可能会深入到不同的设计中。