我正在尝试在 Akka 之上实现 MapReduce,并且很幸运地找到了Akka Essentials书的代码。然而,我在这个示例实现中发现了两个主要问题,而且这两个似乎都是基本的并发设计缺陷,顺便说一句,在一本关于 Akka 的书中发现这些缺陷非常令人震惊:
完成后,客户端将调用
shutdown()
,但此时不能保证消息通过 WCMapReduceServer。我看到 WCMapReduceServer 在任何时候都只能获取部分数量的客户端消息,然后 WCMapReduceServer 输出[INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://ClientApplication@192.168.224.65:2552
意味着客户端shutdown()
发生在客户端实际设法刷新所有未决消息之前。在客户端代码第 41 行中,我们看到shutdown()
没有先刷新就发生了。Akka 有没有办法在关闭系统之前强制刷新出站消息?我已经修复的另一个实际上更大的缺陷是用于向 MapReduce 服务器发出 EOF 信号的方式,即在所有子任务(文件的每一行)都已完成的情况下,主任务(单词文件)已完成。他发送一条特殊的字符串消息
DISPLAY_LIST
,这条消息以最低优先级排队,参见代码。这里最大的缺陷是即使DISPLAY_LIST
优先级最低,如果任何 Map(或 Reduce)任务花费任意时间,DISPLAY_LIST
消息将在所有 MapReduce 子任务完成之前通过,因此这个 MapReduce 示例的结果是不确定的,即您可以从每次运行中获得不同的字典。可以通过替换MapActor#onReceive 实现来揭示问题使用以下内容,即任意长一个 Map 步骤:public void onReceive(Object message) { System.out.println("MapActor -> onReceive(" + message + ")"); if (message instanceof String) { String work = (String) message; // ******** BEGIN SLOW DOWN ONE MAP REQUEST if ("Thieves! thieves!".equals(work)) { try { System.out.println("*** sleeping!"); Thread.sleep(5000); System.out.println("*** back!"); } catch (InterruptedException e) { e.printStackTrace(); } } // ******** END SLOW DOWN ONE MAP REQUEST // perform the work List<Result> list = evaluateExpression(work); // reply with the result actor.tell(list); } else throw new IllegalArgumentException("Unknown message [" + message + "]"); }
进一步阅读这本书会发现:
我们有 Thread.sleep() 因为不能保证消息的处理顺序。第一个 Thread.sleep() 方法确保 在我们发送 Result 消息之前,所有字符串语句消息都被完全处理。
我很抱歉,但Thread.sleep()
从来都不是确保任何事情并发的方法。因此,难怪像这样的书最终会在其示例中充满基本的并发缺陷。