2

请注意:我是一名 Java 开发人员,没有 Scala 的工作知识(很遗憾)。我会要求答案中提供的任何代码示例都将使用 Akka 的 Java API。

我对 Akka 和演员来说是全新的,并且正在尝试建立一个相当简单的演员系统:

在此处输入图像描述

因此,DataSplitteractor 运行并将相当大的二进制数据块(例如 20GB)拆分为 100 KB 的块。对于每个块,数据都存储在DataCache通过DataCacher. 在后台,aDataCacheCleaner翻阅缓存并找到可以安全删除的数据块。这就是我们防止缓存变为 20GB 大小的方法。

在将块发送到DataCacher缓存后,DataSplitter然后通知ProcessorPool现在需要处理的块。这ProcessorPool是一个路由器/池,由数万个不同的ProcessorActors. 当每个ProcessActor人收到“处理”100KB 数据块的通知时,它就会从 中获取数据DataCacher并对其进行一些处理。

如果你想知道我为什么还要在这里缓存任何东西(因此是DataCacher,DataCacheDataCacheCleaner),我的想法是 100KB 仍然是一个相当大的消息,可以传递给数以万计的 Actor 实例(100KB * 1,000 = 100MB),所以我试图只存储一次 100KB 块(在缓存中),然后让每个参与者通过缓存 API 引用来访问它。

还有一个Mailmanactor 订阅事件总线并拦截所有DeadLetters.

因此,总共有 6 个演员:

  • DataSplitter
  • DataCacher
  • DataCacheCleaner
  • ProcessorPool
  • ProcessorActor
  • Mailman

Akka 文档宣扬您应该基于划分子任务而不是纯粹按功能分解您的参与者系统,但我并没有完全了解这在此处是如何应用的。手头的问题是我正在尝试在这些参与者之间组织一个主管层次结构,我不确定最好/正确的方法是什么。显然ProcessorPool是一个需要成为 的父/主管的路由器ProcessorActors,所以我们有这个已知的层次结构:

/user/processorPool/
    processorActors

但除了那种已知/明显的关系,我不知道如何组织我的其他演员。我可以让他们在一个共同/主要演员下成为“同行”:

/user/master/
    dataSplitter/
    dataCacher/
    dataCacheCleaner/
    processorPool/
        processorActors/
    mailman/

或者我可以省略一个master(根)actor并尝试使缓存周围的东西更加垂直:

/user/
    dataSplitter/
    cacheSupervisor/
        dataCacher/
        dataCacheCleaner/
    processorPool/
        processorActors/
    mailman/

对 Akka 如此陌生,我只是不确定最好的行动方案是什么,如果有人可以在这里提供一些初步的帮助,我相信灯泡都会打开。而且,与组织这个层次结构同样重要的是,我什至不确定我可以使用哪些 API 构造来实际在代码中创建层次结构

4

1 回答 1

0

将它们组织在一个之下master可以更容易管理,因为您可以watched通过主管访问所有参与者(在本例中master)。

一种分层实现可以是:

大师级监督演员

class MasterSupervisor extends UntypedActor {

private static SupervisorStrategy strategy = new AllForOneStrategy(2,
        Duration.create(5, TimeUnit.MINUTES),

        new Function<Throwable, Directive>() {
            @Override
            public Directive apply(Throwable t) {

                if (t instanceof SQLException) {
                    log.error("Error: SQLException")
                    return restart()
                } else if (t instanceof IllegalArgumentException) {
                    log.error("Error: IllegalArgumentException")
                    return stop()
                } else {
                    log.error("Error: GeneralException")
                    return stop()
                }
            }
        });

@Override
public SupervisorStrategy supervisorStrategy() { return strategy }

@Override
void onReceive(Object message) throws Exception {
     if (message.equals("SPLIT")) {
          // CREATE A CHILD OF MyOtherSupervisor
          if (!dataSplitter) {
              dataSplitter = context().actorOf(FromConfig.getInstance().props(Props.create(DataSplitter.class)), "DataSplitter")

              // WATCH THE CHILD
              context().watch(dataSplitter)

              log.info("${self().path()} has created, watching and sent JobId = ${message} message to DataSplitter")
          }

          // do something with message such as Forward
          dataSplitter.forward(message, context())
      }
}

DataSplitter 演员

class DataSplitter extends UntypedActor {

    // Inject a Service to do the main operation
    DataSplitterService dataSplitterService

    @Override
    void onReceive(Object message) throws Exception {
        if (message.equals("SPLIT")) {
            log.info("${self().path()} recieved message: ${message} from ${sender()}")
            // do something with message such as Forward
            dataSplitterService.splitData()
        }
    }
}
于 2015-12-02T14:26:22.277 回答