问题标签 [akka-dispatcher]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
349 浏览

scala - 使用 Akka Dispatchers 处理 Futures

我有一个基于喷雾的 HTTP 服务。我有一个在这个 HTTP 应用程序中运行的流。现在因为这个流做了很多 I/O,我决定使用一个单独的线程池。我查阅了 Akka 文档,看看我能做些什么来使我的线程池是可配置的。我在 Akka 中遇到了 Dispatcher 概念。所以我尝试在我的 application.conf 中使用它,如下所示:

在我的 Actor 中,我尝试将此配置查找为:

当我运行我的服务时,我收到以下错误:

我的问题是:

  1. 我创建的这个 io-dispatcher 线程池是否仅用于 Actor 的?我的意图是将此线程池用于我的流,该流由其中一个 Actor 实例化。然后我将此线程池传递给我的流。

  2. 我如何通过从 application.conf 加载调度程序来创建 ExecutionContext?我是否应该使用任何特定的库来读取我的线程池配置并给我一个 ExecutionContext?

0 投票
1 回答
857 浏览

scala - 具有可变大小线程池执行器的 Akka 平衡池

这是我的用例:

我想创建一个大小为 x 的平衡池路由器(x 是在运行时确定的实例/路由的数量),每个路由都有自己的专用线程。每个路由都应该执行阻塞操作。

Akka 文档解释说您不能更改平衡池的调度程序。这消除了使用固定调度程序的选项(这正是我需要的,每个路由大小为 1 的线程池)。

来自Akka 文档

BalancingPool 自动为其路由使用特殊的 BalancingDispatcher - 忽略在 routee Props 对象上设置的任何调度程序。这是为了通过所有路由共享同一个邮箱来实现平衡语义所必需的。

虽然无法更改路由使用的调度程序,但可以微调使用的执行程序。默认情况下,使用fork-join-dispatcher并且可以按照 Dispatchers [AJ:我相信这是一个错字,它们的意思是 fork-join-executor]中的说明进行配置。在期望路由执行阻塞操作的情况下,将其替换为线程池执行器可能会很有用,该线程池执行器显式提示分配的线程数

来自 Akka 文档的配置示例:

池大小与路由数量匹配的线程池执行器似乎可以工作,但问题是:我如何动态(在运行时)设置我的线程池执行器的池大小以匹配路由数量,如果必须在配置中明确设置池大小?

此外,是否可以在从配置中提取其路由的执行程序时创建平衡池(在代码中)?即我不想使用配置来定义我的路由器,我想使用:

但以某种方式指定要使用的执行程序(来自配置),使用与 withMailbox() 或 withDispatcher() 相同的样式

由于平衡调度程序是“驱动:java.util.concurrent.ExecutorService”,是否可以创建一个 ExecutorService 并将其传递给平衡池路由器或其调度程序?

0 投票
1 回答
202 浏览

scala - 不占用所有默认调度程序的 Actor 中的阻塞操作

我最近正在学习 Akka Actor。我在 Actor 中阅读了调度员的文档。我很好奇演员中的阻塞操作。文档中的最后一个主题描述了如何解决该问题。我正在尝试重现文档中的示例实验。

这是我的代码:

我只是使用默认调度程序创建一个ActorSystem,所有参与者都依赖于这些调度程序。有BlockingFutureActor一个封装在Future. 这PrintActor只是立即打印一个数字。

在文档的解释中,默认的 dispatchers 会被 Futures占用BlockingFutureActor,导致 .s 的消息阻塞PrintActor。该应用程序卡在某个地方,例如:

不幸的是,我的代码没有被阻止。所有输出PrintActor都顺利显示。但是出现的输出BlockingFutureActor就像挤牙膏一样。我尝试通过 Intellij 的 Debug 监控我的线程信息,我得到: 线程监控

您可能会发现只有两个调度员在睡觉(BlockingFutureActor导致这种情况发生)。其他人正在等待,这意味着他们可以进行新消息传递。

我已经阅读了关于 Actor( page )中阻塞操作的答案。引用说“调度程序实际上是线程池。将两者分开可以保证缓慢的阻塞操作不会使另一个饿死。这种方法通常被称为批量标题,因为它的想法是如果应用程序的一部分发生故障,其余部分仍保持响应。”

默认调度程序是否会为阻塞操作保留一些调度程序?这样即使有很多阻塞操作要求调度程序,系统也可以处理消息。

Akka文档中的实验可以复现吗?是不是我的配置有问题。

感谢您的建议。最良好的祝愿。

0 投票
1 回答
54 浏览

java - How to create BalancingDispatcher using Akka in Java?

I want to scale my application UP and DOWN depending on no of requests from users for this i want to share the same mailbox with all the presently running actors. I think Balancing Dispatcher will solve my problem as per his work stealing algorithm. But I am not able to figure out how to do it. I am new to akka framework. Some code samples about this will be really helpful. I have few questions regarding this,

Thanks in advance

0 投票
1 回答
441 浏览

scala - Akka Streams Reactive Kafka - OutOfMemoryError under high load

I am running an Akka Streams Reactive Kafka application which should be functional under heavy load. After running the application for around 10 minutes, the application goes down with an OutOfMemoryError. I tried to debug the heap dump and found that akka.dispatch.Dispatcher is taking ~5GB of memory. Below are my config files.

Akka version: 2.4.18

Reactive Kafka version: 2.4.18

1.application.conf:

2.build.sbt:

3.Source and Sink actors:

In this case actorA, actorB, and actorC are doing some business logic processing and database interaction. Is there anything I am missing in handling the Akka Reactive Kafka consumers such as commit, error, or throttling configuration? Because looking into the heap dump, I could guess that the messages are piling up.

0 投票
0 回答
225 浏览

scala - 如何在 Akka 中控制演员系统的运行时执行?

我一直在尝试找出一种方法来控制具有某种外部(与演员系统分开)控制器的演员系统内传递的运行时消息。换句话说,给定一个参与者系统(我不想更改):我如何设置一种控制器来控制其中的消息传递?

例如,假设给定的 Actor 系统具有以下设置:

我想完成以下任务:

  • 在单个线程上同步运行此程序
  • 对于系统内传递的每条消息:检查内容、发送者和接收者,并以此为基础;执行一些逻辑。

在上面的示例中,我想要一个“控制器”,它可以执行以下操作:

  1. 演员 A 从外部收到消息“开始”,并向演员 B 发送“消息”
  2. 在控制器上执行一些阻塞逻辑,即actor系统将空闲地等待这个逻辑被执行。
  3. 现在已经执行了逻辑,控制器向参与者系统发送绿灯以恢复消息传递。
  4. 演员 B 收到“消息”并打印“一些逻辑”
  5. 控制器检查参与者系统是否终止,它是,并执行一些额外的逻辑。

简而言之,我希望外部控制器能够在运行时控制 Actor 系统内的消息传递。

我在想这个控制器可能会使用调度程序、路由器参与者逻辑和期货来实现。我在 Akka 文档中没有找到任何关于此的示例,那么这甚至可以实现吗?

0 投票
3 回答
688 浏览

scala - 为什么在 Akka Dispatcher 上启动时 Futures 中的 Futures 顺序运行

当我们尝试从参与者的接收方法中启动多个期货时,我们观察到了一种奇怪的行为。如果我们将配置的调度程序用作 ExecutionContext,则期货在同一个线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global,则期货按预期并行运行。

我们将代码归结为以下示例(更完整的示例如下):

一个可编译的例子是这样的:

我们尝试了 thread-pool-executor 和 fork-join-executor,结果相同。

我们是否以错误的方式使用期货?那么你应该如何产生并行任务呢?

0 投票
1 回答
222 浏览

akka - “throughput-deadline-time”配置选项有什么作用?

我偶然发现了throughput-deadline-timeAkka 调度程序的配置属性,它看起来是一个有趣的选项,但是我可以在整个文档中找到的唯一提及如下:

我认为我们可以同意这不是很有帮助。

那么throughput-deadline-time控制是什么,它对我的​​调度员有什么影响?

0 投票
2 回答
188 浏览

scala - 没有任务时,为什么 Akka 会关闭调度程序?

我想有一个固定的线程池和一旦创建的线程。所以,我创建了自己的ExecutorServiceConfigurator

并使用它:

但是每次,当我的程序没有任何任务时,Akka 都会关闭ExecutorService

akka.dispatch.MessageDispatcher:

我无法理解这种行为。我认为,创建线程是昂贵的操作。

0 投票
1 回答
312 浏览

c# - 如何在 asp.net core akka 中正确配置调度程序

我正在向我的 akka.net 配置添加一个固定调度程序,因为我读到这将及时向我的演员发送消息,但是当我遵循 akka 配置时,我想出了这个

因为我想基于固定调度程序运行它,但目前我在终端中收到此错误

有人使用调度程序并正确配置它吗?您能建议对我的代码进行任何改进吗?请告诉我