问题标签 [akka-dispatcher]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 使用 Akka Dispatchers 处理 Futures
我有一个基于喷雾的 HTTP 服务。我有一个在这个 HTTP 应用程序中运行的流。现在因为这个流做了很多 I/O,我决定使用一个单独的线程池。我查阅了 Akka 文档,看看我能做些什么来使我的线程池是可配置的。我在 Akka 中遇到了 Dispatcher 概念。所以我尝试在我的 application.conf 中使用它,如下所示:
在我的 Actor 中,我尝试将此配置查找为:
当我运行我的服务时,我收到以下错误:
我的问题是:
我创建的这个 io-dispatcher 线程池是否仅用于 Actor 的?我的意图是将此线程池用于我的流,该流由其中一个 Actor 实例化。然后我将此线程池传递给我的流。
我如何通过从 application.conf 加载调度程序来创建 ExecutionContext?我是否应该使用任何特定的库来读取我的线程池配置并给我一个 ExecutionContext?
scala - 具有可变大小线程池执行器的 Akka 平衡池
这是我的用例:
我想创建一个大小为 x 的平衡池路由器(x 是在运行时确定的实例/路由的数量),每个路由都有自己的专用线程。每个路由都应该执行阻塞操作。
Akka 文档解释说您不能更改平衡池的调度程序。这消除了使用固定调度程序的选项(这正是我需要的,每个路由大小为 1 的线程池)。
来自Akka 文档:
BalancingPool 自动为其路由使用特殊的 BalancingDispatcher - 忽略在 routee Props 对象上设置的任何调度程序。这是为了通过所有路由共享同一个邮箱来实现平衡语义所必需的。
虽然无法更改路由使用的调度程序,但可以微调使用的执行程序。默认情况下,使用fork-join-dispatcher并且可以按照 Dispatchers [AJ:我相信这是一个错字,它们的意思是 fork-join-executor]中的说明进行配置。在期望路由执行阻塞操作的情况下,将其替换为线程池执行器可能会很有用,该线程池执行器显式提示分配的线程数
来自 Akka 文档的配置示例:
池大小与路由数量匹配的线程池执行器似乎可以工作,但问题是:我如何动态(在运行时)设置我的线程池执行器的池大小以匹配路由数量,如果必须在配置中明确设置池大小?
此外,是否可以在从配置中提取其路由的执行程序时创建平衡池(在代码中)?即我不想使用配置来定义我的路由器,我想使用:
但以某种方式指定要使用的执行程序(来自配置),使用与 withMailbox() 或 withDispatcher() 相同的样式
由于平衡调度程序是“驱动:java.util.concurrent.ExecutorService”,是否可以创建一个 ExecutorService 并将其传递给平衡池路由器或其调度程序?
scala - 不占用所有默认调度程序的 Actor 中的阻塞操作
我最近正在学习 Akka Actor。我在 Actor 中阅读了调度员的文档。我很好奇演员中的阻塞操作。文档中的最后一个主题描述了如何解决该问题。我正在尝试重现文档中的示例实验。
这是我的代码:
我只是使用默认调度程序创建一个ActorSystem
,所有参与者都依赖于这些调度程序。有BlockingFutureActor
一个封装在Future
. 这PrintActor
只是立即打印一个数字。
在文档的解释中,默认的 dispatchers 会被 Future
s占用BlockingFutureActor
,导致 .s 的消息阻塞PrintActor
。该应用程序卡在某个地方,例如:
不幸的是,我的代码没有被阻止。所有输出PrintActor
都顺利显示。但是出现的输出BlockingFutureActor
就像挤牙膏一样。我尝试通过 Intellij 的 Debug 监控我的线程信息,我得到:
您可能会发现只有两个调度员在睡觉(BlockingFutureActor
导致这种情况发生)。其他人正在等待,这意味着他们可以进行新消息传递。
我已经阅读了关于 Actor( page )中阻塞操作的答案。引用说“调度程序实际上是线程池。将两者分开可以保证缓慢的阻塞操作不会使另一个饿死。这种方法通常被称为批量标题,因为它的想法是如果应用程序的一部分发生故障,其余部分仍保持响应。”
默认调度程序是否会为阻塞操作保留一些调度程序?这样即使有很多阻塞操作要求调度程序,系统也可以处理消息。
Akka文档中的实验可以复现吗?是不是我的配置有问题。
感谢您的建议。最良好的祝愿。
java - How to create BalancingDispatcher using Akka in Java?
I want to scale my application UP and DOWN depending on no of requests from users for this i want to share the same mailbox with all the presently running actors. I think Balancing Dispatcher will solve my problem as per his work stealing algorithm. But I am not able to figure out how to do it. I am new to akka framework. Some code samples about this will be really helpful. I have few questions regarding this,
Thanks in advance
scala - Akka Streams Reactive Kafka - OutOfMemoryError under high load
I am running an Akka Streams Reactive Kafka application which should be functional under heavy load. After running the application for around 10 minutes, the application goes down with an OutOfMemoryError
. I tried to debug the heap dump and found that akka.dispatch.Dispatcher
is taking ~5GB of memory. Below are my config files.
Akka version: 2.4.18
Reactive Kafka version: 2.4.18
1.application.conf
:
2.build.sbt
:
3.Source
and Sink
actors:
In this case actorA
, actorB
, and actorC
are doing some business logic processing and database interaction. Is there anything I am missing in handling the Akka Reactive Kafka consumers such as commit, error, or throttling configuration? Because looking into the heap dump, I could guess that the messages are piling up.
scala - 如何在 Akka 中控制演员系统的运行时执行?
我一直在尝试找出一种方法来控制具有某种外部(与演员系统分开)控制器的演员系统内传递的运行时消息。换句话说,给定一个参与者系统(我不想更改):我如何设置一种控制器来控制其中的消息传递?
例如,假设给定的 Actor 系统具有以下设置:
我想完成以下任务:
- 在单个线程上同步运行此程序
- 对于系统内传递的每条消息:检查内容、发送者和接收者,并以此为基础;执行一些逻辑。
在上面的示例中,我想要一个“控制器”,它可以执行以下操作:
- 演员 A 从外部收到消息“开始”,并向演员 B 发送“消息”
- 在控制器上执行一些阻塞逻辑,即actor系统将空闲地等待这个逻辑被执行。
- 现在已经执行了逻辑,控制器向参与者系统发送绿灯以恢复消息传递。
- 演员 B 收到“消息”并打印“一些逻辑”
- 控制器检查参与者系统是否终止,它是,并执行一些额外的逻辑。
简而言之,我希望外部控制器能够在运行时控制 Actor 系统内的消息传递。
我在想这个控制器可能会使用调度程序、路由器参与者逻辑和期货来实现。我在 Akka 文档中没有找到任何关于此的示例,那么这甚至可以实现吗?
scala - 为什么在 Akka Dispatcher 上启动时 Futures 中的 Futures 顺序运行
当我们尝试从参与者的接收方法中启动多个期货时,我们观察到了一种奇怪的行为。如果我们将配置的调度程序用作 ExecutionContext,则期货在同一个线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global,则期货按预期并行运行。
我们将代码归结为以下示例(更完整的示例如下):
一个可编译的例子是这样的:
我们尝试了 thread-pool-executor 和 fork-join-executor,结果相同。
我们是否以错误的方式使用期货?那么你应该如何产生并行任务呢?
akka - “throughput-deadline-time”配置选项有什么作用?
我偶然发现了throughput-deadline-time
Akka 调度程序的配置属性,它看起来是一个有趣的选项,但是我可以在整个文档中找到的唯一提及如下:
我认为我们可以同意这不是很有帮助。
那么throughput-deadline-time
控制是什么,它对我的调度员有什么影响?
scala - 没有任务时,为什么 Akka 会关闭调度程序?
我想有一个固定的线程池和一旦创建的线程。所以,我创建了自己的ExecutorServiceConfigurator
:
并使用它:
但是每次,当我的程序没有任何任务时,Akka 都会关闭ExecutorService:
akka.dispatch.MessageDispatcher:
我无法理解这种行为。我认为,创建线程是昂贵的操作。
c# - 如何在 asp.net core akka 中正确配置调度程序
我正在向我的 akka.net 配置添加一个固定调度程序,因为我读到这将及时向我的演员发送消息,但是当我遵循 akka 配置时,我想出了这个
因为我想基于固定调度程序运行它,但目前我在终端中收到此错误
有人使用调度程序并正确配置它吗?您能建议对我的代码进行任何改进吗?请告诉我