问题标签 [akka-actor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
807 浏览

scala - 如何在 Akka Actor 中检查消息是否有发件人

有时发送给参与者的消息没有发送者,例如,如果它们是这样发送的:

一个用例是表明您对对此的响应不感兴趣Message

在演员的receive方法中,我如何检查是否sender()是演员,反对noSender

我想出的最好的方法是基于参与者路径的以下测试,但我不确定我是否可以依靠它在所有情况下和未来的变化、Akka 集群等中工作。

如果没有发件人,我不希望回复转到deadLetters,因为在我的系统中,我将未送达的消息视为出现问题的警告。

有没有更好更可靠的方法来检查是否有发件人?

0 投票
1 回答
339 浏览

java - 如何检测akka actor终止是由于系统关闭并避免重新启动它

我有一个使用小型 Akka 演员系统(使用 Java)的 Spring 应用程序,其中我有一个MasterActor扩展 Akka 的 Akka AbstractActor,它初始化 aRouter并设置了一些工作演员。它还监视工人的生命周期。如果 Worker 演员因某些Exception.

我遇到的问题是,如果 Spring 应用程序无法启动。(例如它无法连接到数据库,或者某些凭据不正确等),我正在接收Terminated来自所有工作人员的消息,并且主演员尝试启动新的演员,这些演员也会Terminated立即进入一个无限循环。

检测这种情况的正确方法是什么?有没有办法让主演员检测到演员系统正在关闭,以便工人不会再次重新启动?

0 投票
0 回答
43 浏览

java - 阿卡版本升级

我有四个使用 Akka Actor 相互通信的 Java 项目(Web 服务器)。我的问题是:将其中一个项目中的 Akka 版本从 2.1.2 升级到最新版本有什么副作用?

0 投票
2 回答
267 浏览

scala - Akka - 容错场景 - 测试失败

我在测试中模拟三个演员(A,B,C)之间的对话

MessageB2C成功到达 C 时,确认被发送回源。

这次谈话的唯一特点是信息MessageB2CMessageB2C至少每 50 ms 发送一次,直到 C 没有用它的确认来回答。

我已经用 scala testkit 框架实现了这个简单的对话,但是在特定情况下测试失败了。

https://github.com/freedev/reactive-akka-testkit

当 ActorB 重试发送 MessageB2C 的次数超过一次时,则无法从 ActorC 接收到答案。ActorC 对 ActorB 的回复发送到 deadLetters。

这是ActorB代码:

更新

奇怪的是,测试成功完成评论该行actorA.expectNoMessage

0 投票
0 回答
977 浏览

akka - 即使作业正在运行,Flink 作业提交也会失败

我正在使用带有 1 个 jobManager 和 n 个 taskManagers 的独立 Flink 集群。当我尝试通过命令行提交作业时,作业提交失败,错误消息为 org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).

在 jobManager 实例上,一切正常,直到作业从 DEPLOYING 切换到 RUNNING。发布后,一旦 akka-timeut 到期,我会看到以下堆栈跟踪

我浏览了 github 上的 flink 代码,执行作业所需的所有步骤似乎都运行良好。但是,当 jobManager 必须向触发作业的 flink 客户端发出作业提交确认时,jobSubmitHandler 在 akka 调度程序上超时,根据我的理解,该调度程序负责与作业客户端进行通信。

Flink 作业由 1 个 Source(kafka)、2 个 operator 和 1 个 sink(Custom Sink)组成。以下链接显示了 jobManager 日志: https ://pastebin.com/raw/3GaTtNrG

一旦调度程序超时,所有其他 Flink UI 调用也会超时并出现相同的异常。

以下是用于通过命令行提交作业的 flink 客户端日志。

我已经打开了 flink、akka 和 kafka 的调试日志,但无法弄清楚出了什么问题。我对akka有非常基本的了解,因此无法弄清楚出了什么问题。有人可以帮我吗?我正在运行 flink 1.8.0。

0 投票
2 回答
172 浏览

akka - 在 Akka 中将实体保留为状态或子角色

考虑一个在线购物的用例,我有一个库存和物品。我看到了几个使用 Akka 演员来建模的选项。

  1. 例如,创建一个名为 Inventory 的持久性参与者,将处于其状态的项目保存在列表中。

  2. 创建一个名为 Inventory 的参与者,然后为每个项目创建一个子持久参与者。每个项目保持自己的状态。

问题是——第二种选择有意义吗?我什么时候应该将实体保持为演员的状态或将其建模为子演员?在这种情况下我们应该考虑什么?

0 投票
2 回答
207 浏览

scala - 如何避免在具有继承行为的 Akka Actor 中改变状态变量?

我有一些具有共同行为的 Akka 演员。这种常见行为在 trait 中定义:

Actor 覆盖了这个 trait 并定义了额外的行为orElse。这是这种演员的一个例子:

我知道改变状态是一种反模式,我想使用context.become. 问题是,当用 更改状态时MyActorcontext.become我不知道 commonActions 的参数。甚至可以继承行为吗?我是否需要更大的重构(例如创建代理演员)?这是我已经走了多远:

0 投票
1 回答
551 浏览

scala - Akka-actor java.lang.NoClassDefFoundError: akka/actor/CoordinatedShutdown$$anonfun$totalTimeout

在运行 mvn clean install 时,在所有测试用例都通过后,我遇到了以下错误。

0 投票
1 回答
227 浏览

akka - Akka Dispatcher 线程创建

我一直在研究 Akka Actor 模型。我有一个用例,其中将有 1000 多个演员处于活动状态,我必须处理这些演员。我想过通过application.conf中定义的配置来控制线程数。

但不是。在我的应用程序中创建的调度程序线程使我无法调整调度程序配置。每次重新启动我的应用程序时,我都会看到创建了不同数量的调度程序线程(我每次启动应用程序后都通过线程转储检查了这一点)。

甚至线程数也不等于我在 parallelism-min 中定义的线程数。由于线程数很少,我的应用程序处理速度非常慢。在检查没有。通过以下代码在我的机器中获取核心:

Runtime.getRuntime().availableProcessors();

它显示 40。但没有。即使我将并行度配置为 500,创建的调度程序线程数也少于 300。

以下是我的 application.conf 文件:

我可以知道 akka 将在什么基础上在内部创建调度程序线程以及如何增加调度程序线程数以增加参与者的并行处理?

0 投票
1 回答
53 浏览

akka-actor - scala并发编程中akka actor的奇怪行为

我的代码:

我用相同的数字运行了 3 次,输出:

错误的!

这是对的

错误的!