5

我正在尝试在 Akka 之上实现 MapReduce,并且很幸运地找到了Akka Essentials书的代码。然而,我在这个示例实现中发现了两个主要问题,而且这两个似乎都是基本的并发设计缺陷,顺便说一句,在一本关于 Akka 的书中发现这些缺陷非常令人震惊:

  1. 完成后,客户端将调用shutdown(),但此时不能保证消息通过 WCMapReduceServer。我看到 WCMapReduceServer 在任何时候都只能获取部分数量的客户端消息,然后 WCMapReduceServer 输出[INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://ClientApplication@192.168.224.65:2552意味着客户端shutdown()发生在客户端实际设法刷新所有未决消息之前。在客户端代码第 41 行中,我们看到shutdown()没有先刷新就发生了。Akka 有没有办法在关闭系统之前强制刷新出站消息?

  2. 我已经修复的另一个实际上更大的缺陷是用于向 MapReduce 服务器发出 EOF 信号的方式,即在所有子任务(文件的每一行)都已完成的情况下,主任务(单词文件)已完成。他发送一条特殊的字符串消息DISPLAY_LIST,这条消息以最低优先级排队,参见代码。这里最大的缺陷是即使DISPLAY_LIST优先级最低,如果任何 Map(或 Reduce)任务花费任意时间,DISPLAY_LIST消息将在所有 MapReduce 子任务完成之前通过,因此这个 MapReduce 示例的结果是不确定的,即您可以从每次运行中获得不同的字典。可以通过替换MapActor#onReceive 实现来揭示问题使用以下内容,即任意长一个 Map 步骤:

    public void onReceive(Object message) {
        System.out.println("MapActor -> onReceive(" + message + ")");
        if (message instanceof String) {
            String work = (String) message;
            // ******** BEGIN SLOW DOWN ONE MAP REQUEST
            if ("Thieves! thieves!".equals(work)) {
                try {
                    System.out.println("*** sleeping!");
                    Thread.sleep(5000);
                    System.out.println("*** back!");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // ******** END SLOW DOWN ONE MAP REQUEST
    
            // perform the work
            List<Result> list = evaluateExpression(work);
    
            // reply with the result
            actor.tell(list);
    
        } else throw new IllegalArgumentException("Unknown message [" + message + "]");
    }
    

进一步阅读这本书会发现:

我们有 Thread.sleep() 因为不能保证消息的处理顺序。第一个 Thread.sleep() 方法确保 在我们发送 Result 消息之前,所有字符串语句消息都被完全处理。

我很抱歉,但Thread.sleep()从来都不是确保任何事情并发的方法。因此,难怪像这样的书最终会在其示例中充满基本的并发缺陷。

4

1 回答 1

1

我已经解决了这两个问题,并将代码迁移到了最新的 Akka 版本 2.2-M3。

第一个问题的解决方案是让 MapReduce 远程 MasterActor 在收到从客户端发送的所有消息后立即发送的 ShutdownInfo 通知。TaskInfo 包含 MapReduce 任务有多少子任务的信息,例如在这种情况下,文本文件中有多少行。

第二个问题的解决方案是发送带有子任务总数的TaskInfo。在这里,AggregatorActor 计算它已处理的子任务的数量,将其与 TaskInfo 进行比较,并在它们匹配时发出作业已完成的信号(当前仅打印一条消息)。

输出中显示了有趣且正确的行为:

  • ClientActor 发送一堆消息,它们是“子任务”。请注意,身份请求模式用于访问远程 MapReduce MasterActor 的 ActorRef。
  • ClientActor 最后发送 TaskInfo 消息,说明之前发送了多少子任务。
  • MasterActor 将 String 消息转发给 MapActor,MapActor 又转发给 ReduceActor
  • 一个 MapActor 是一个冗长的,即具有“盗贼!盗贼!”的内容。这会稍微减慢 MapReduce 的计算速度。
  • 同时 MasterActor 接收到 TaskInfo 最后一条消息,并将 ShudownInfo 发送回 ClientActor
  • ClientActor 运行system.shutdown(),Client 终止。请注意,MapReduce 仍在处理中,客户端关闭不会干扰。
  • 冗长的 MapActor 回来了,消息处理继续。
  • AggregatorActor 接收 TaskInfo 并通过计算子任务确认子任务的总数已完成并发出完成信号。

代码可以从我的存储库中获取: https ://github.com/bravegag/akka-mapreduce-example

随时欢迎反馈。

于 2013-06-25T16:54:19.670 回答