5

有几个人和我一起在一个项目上工作,他们一直在试图找出解决这个问题的最佳方法。看起来这应该是经常需要的标准东西,但由于某种原因,我们似乎无法得到正确的答案。

如果我有一些工作要完成并且我向路由器发送一堆消息,我如何知道所有工作何时完成?例如,如果我们正在读取一个 100 万行文件的行并将该行发送给 actor 进行处理,而您需要处理下一个文件,但必须等待第一个文件完成,您怎么知道它何时完成已经完成?

进一步的评论。我知道并且已经使用了与 Patters.ask() 一起使用的 Await.result() 和 Await.ready()。一个区别是,每一行都会有一个 Future,我们将有一大堆这样的 future 等待,而不仅仅是一个。此外,我们正在填充一个占用大量内存的大型域模型,并且不希望添加额外的内存来在内存中保存相等数量的等待组合的期货,同时使用每个演员在完成工作后完成而不是等待内存要组成。

我们使用的是 Java 而不是 Scala。

伪代码:

for(File file : files) {
    ...
    while((String line = getNextLine(fileStream)) != null) {
        router.tell(line, this.getSelf());
    }
    // we need to wait for this work to finish to do the next
    // file because it's dependent on the previous work
}

看起来你经常想做很多工作,并知道什么时候完成演员。

4

2 回答 2

4

我相信我有一个解决方案给你,它不涉及积累一大堆Futures。一是高层概念。将有两个参与者参与此流程。我们将首先调用FilesProcessor. 这个演员将是短暂的和有状态的。每当您想按顺序处理一堆文件时,您都可以启动该 actor 的一个实例并向其传递一条包含您要处理的文件的名称(或路径)的消息。当它完成所有文件的处理后,它会自行停止。我们将调用的第二个演员LineProcessor。这个actor是无状态的、长期存在的并且集中在路由器后面。它处理一个文件行,然后响应请求行处理的任何人,告诉他们它已经完成了该行的处理。现在进入代码。

首先是消息:

public class Messages {

  public static class ProcessFiles{
    public final List<String> fileNames;
    public ProcessFiles(List<String> fileNames){
      this.fileNames = fileNames;
    }
  }

  public static class ProcessLine{
    public final String line;
    public ProcessLine(String line){
      this.line = line;
    }
  }

  public static class LineProcessed{}

  public static LineProcessed LINE_PROCESSED = new LineProcessed();
}

FilesProcessor

public class FilesProcessor extends UntypedActor{
  private List<String> files;
  private int awaitingCount;
  private ActorRef router;

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessFiles){      
      ProcessFiles pf = (ProcessFiles)msg;
      router = ... //lookup router;
      files = pf.fileNames;
      processNextFile();
    }
    else if (msg instanceof LineProcessed){
      awaitingCount--;
      if (awaitingCount <= 0){
        processNextFile();
      }
    }

  }

  private void processNextFile(){
    if (files.isEmpty()) getContext().stop(getSelf());
    else{            
      String file = files.remove(0);
      BufferedReader in = openFile(file);
      String input = null;
      awaitingCount = 0;

      try{
        while((input = in.readLine()) != null){
          router.tell(new Messages.ProcessLine(input), getSelf());
          awaitingCount++;
        }        
      }
      catch(IOException e){
        e.printStackTrace();
        getContext().stop(getSelf());
      }

    }
  }

  private BufferedReader openFile(String name){
    //do whetever to load file 
    ...
  }

}

LineProcessor

public class LineProcessor extends UntypedActor{

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessLine){
      ProcessLine pl = (ProcessLine)msg;

      //Do whatever line processing...

      getSender().tell(Messages.LINE_PROCESSED, getSelf());
    }
  }

}

现在线路处理器正在发回没有附加内容的响应。如果您需要根据线路的处理发回一些东西,您当然可以更改此设置。我确信这段代码不是防弹的,我只是想向你展示一个高级概念,告诉你如何在没有请求/响应语义和Futures 的情况下完成这个流程。

如果您对此方法有任何疑问或想了解更多详细信息,请告诉我,我很乐意提供。

于 2013-07-12T12:01:03.037 回答
0

在路由上使用context.setRecieveTimeout以将消息发送回发送者,其中包含已处理消息的计数。当处理的消息总数==发送的数量时,您就完成了。

如果您的路由将保持足够繁忙而setReceiveTimeout不会经常触发,请安排您自己的消息以将计数发送回来。

于 2013-07-12T16:36:09.073 回答