1

我知道 Akka 演员不应该为了保持对消息的反应而阻塞,但是我如何构建我的服务来监控一个无限期运行的进程?

例如,我们正在使用 Amazon Kinesis 连接器库。您创建一个具有给定配置的连接器,该配置继承自 Runnable,然后调用 Run() 方法。连接器只是无限期地运行,从 Kinesis 中提取数据,并将其写入 Amazon S3。事实上,如果runnable返回,那就是一个错误,需要重新启动。

方法 (1) 将简单地为每个运行的 Kinesis 连接器创建一个子 Actor,如果 Run() 方法返回,则抛出异常,监督 Actor 会注意到异常并重新启动子 Actor。每个线程每个子actor一个连接器。

方法 (2) 是让子 Actor 将 Kinesis 连接器包装在 Future 中,如果未来返回,则 Actor 将在另一个 Future 中重新启动连接器。可以想象一个参与者可以管理多个连接器,但这是否意味着每个 Future 都在单独的线程中执行?

哪种方法最符合 Akka 的哲学,或者还有其他人推荐的方法吗?一般来说,我想发现任何连接器的任何问题,然后重新启动它。总共不会有超过六个连接器并行运行。

4

2 回答 2

5

我会采用方法 1。应该注意的是,尽管参与者默认情况下没有专用线程,但它们共享一个线程池(所谓的调度程序,请参阅:http ://doc.akka.io/docs/akka/2.3 .6/scala/dispatchers.html)。这意味着阻塞本质上是危险的,因为它耗尽了池中的线程,不让其他非阻塞参与者运行(因为被阻塞的参与者不会将线程放回池中)。因此,您应该将阻塞调用分离到一个固定大小的专用参与者池中,并且您应该为这些参与者分配一个 PinnedDispatcher。后一步确保这些参与者不会相互干扰(他们每个人都有一个专用线程)并确保这些参与者不会干扰系统的其余部分(所有其他参与者将在另一个调度程序上运行,通常默认为-调度员)。一定要限制在 PinnedDispatcher 上运行的 Actor 的数量,因为使用的线程数会随着该调度器上的 Actor 数量而增长。

于 2014-10-28T11:15:53.553 回答
1

在您的两个选项中,我会说 1 更合适。No.2 的缺点是,为了退出未来的 monad 世界,您需要在某处调用 Await,并且您需要指定一个最大持续时间,在您的情况下,这没有意义。

也许您可以在尝试之前考虑其他选择,这很难。一些可能激发您灵感的关键词是流和分布式频道。

于 2014-10-28T11:12:26.353 回答