6

这是我的场景:

我有一个主actor,它接收来自多个子actor的消息。这些消息包含要聚合的数据。在这个聚合逻辑中,如果我使用共享数据结构来收集聚合,我是否需要处理同步问题?

else if(arg0 instanceof ReducedMsg){

                           ReducedMsg reduced = (ReducedMsg)arg0;
        counter.decrementAndGet();

        synchronized(finalResult){

            finalResult.add((KeyValue<K, V>) reduced.getReduced());

            if(counter.get() == 0){
                                    if(checkAndReduce(finalResult)){

                    finalResult.clear();
                }
                else{
                    stop();
                    latch.countDown();
                }

            }

        }



    }

如您所见,我有一个 finalResult,每条消息都将汇总到该结果中,并且在处理逻辑之后,还需要清除集合。

实际上我正在尝试实现的是递归(关联)减少mapreduce。所以我需要保持我假设的同步块?或者 Akka 是否有机会一次执行 onReceive 一个线程?

这种逻辑在小数据集上产生准确和可预测的结果。我的问题是当我的输入数据集有点大时,代码会挂起。我想确定这是因为我的同步块的上下文切换,所以我可能会深入到不同的设计中。

4

1 回答 1

19

onReceive()永远不会同时调用。这是 Akka 给你的最根本的保证。

这意味着,如果您的counter变量是演员中的一个字段,并且没有其他代码可以直接访问该字段,您可以安全地使用普通int/long代替AtomicInteger/ AtomicLong。假设同步finalResult是一个封装并隐藏在参与者中的字段,则不需要同步。

最后的用法CountDownLatch是可疑的。在 Akka 应用程序中,您不应该使用任何同步原语。Actor 本质上是单线程的,所有的通信(包括唤醒和传递数据)都应该通过消息传递来实现。

这在文档中都有解释:http: //doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

于 2012-07-26T16:04:41.967 回答