问题标签 [akka-actor]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
akka - “throughput-deadline-time”配置选项有什么作用?
我偶然发现了throughput-deadline-time
Akka 调度程序的配置属性,它看起来是一个有趣的选项,但是我可以在整个文档中找到的唯一提及如下:
我认为我们可以同意这不是很有帮助。
那么throughput-deadline-time
控制是什么,它对我的调度员有什么影响?
scala - Akka Actor 消息传递延迟
我在使用多个请求扩展我的应用时遇到问题。
每个请求都会向一个参与者发送一个请求,然后该参与者会产生其他参与者。这很好,但是,在负载下(5+ 一次询问),ask
将消息传递给目标参与者需要大量时间。最初的设计是平均分配请求,但这会造成瓶颈。例子:
在这张图片中,ask
在查询计划解析器之后发送。但是,当 Actor 收到此消息时,会有几秒钟的间隔。这仅在负载下(5+ 请求/秒)才会出现。我首先认为这是一个饥饿问题。
设计:每个planner-executor 是每个请求的单独实例。它每次都会产生一个新的“请求接受者”角色(它在收到消息时记录“请求分数”)。
- 我给了actorsystem一个自定义的全局执行器(大的)。我注意到即使在这个巨大的延迟期间,线程也没有超过核心线程池大小
- 我确保子演员中的所有执行上下文都使用了正确的执行上下文
- 确保actor内部的所有阻塞调用都使用了future
- 我给了父actor(和所有子actor)一个自定义调度器,核心大小为50,最大大小为100。即使在这些延迟期间,它也没有要求更多(它保持在50)
- 最后,我尝试为每个请求创建一个全新的 Actorsystem(在 planner-executor 内)。这也没有明显的效果!
我对此有点难过。从这些测试来看,它看起来不像是线程饥饿问题。回到第一点,我不知道为什么消息需要越来越长的时间来传递我发出的更多并发请求。到达此点之前的 Zipkin 跟踪在到达此处之前不会因更多请求而降级ask
。在此之前,服务器能够处理多个步骤,例如验证请求、与数据库对话,然后最终进入计划执行程序。所以我怀疑应用程序本身的 cpu 时间用完了。
scala - 如何检查akka actor中的存储大小
我已经使用 Akka actor 中的 stash 方法在 actor 中实现了 stash,但现在需要查看它的大小(即没有 stash 中的消息)。有什么办法吗?
以下是方法及其文档 -
scala - Akka 演员,等待完成初始化问题
我的应用程序使用内部初始化其他演员(孩子)的主管 akka 演员。但是,由于它是异步执行的,因此我在尝试使用 Akka TestKit 编写测试时遇到了问题。
例如,当尝试通过向主管的子actor发送消息来测试系统如何从另一个系统终止时,我创建了一个新的actor系统,然后使用actorOf一个接收配置的主管(用于创建孩子主管内部的演员),如下所示:
然后,如果我尝试使用actorSelection向子actor发送消息,AnotherSupervisor
它会失败,因为子actor还不是selectable
:
失败,因为还没有AnotherManagementReceiver
创建。
使用Thread.sleep(5000)
作品,但这太糟糕了。
在寻找可能的解决方案后,我测试了:
也因现有的演员消息而失败。
还尝试了 EventFilter 来让子角色在日志中回显一些内容:
这一直等到超时(我什至看到了日志消息),但我猜 EventFilter 只是从 TestKit 创建的“主要”演员系统中读取。
关于如何处理这种情况的任何想法或建议?
java - 如何获取 Akka Actors 的类型?
如何验证创建的参与者的类型?
我遇到了一个问题,我创建了错误的演员。我正在使用 guice 创建演员,因此创建与业务逻辑分离。
问题的根本原因是我创建了一个 Actor(我们称他为 X),而不是一个转发给 X 类型 Actor 的 RouterActor。
因此,对于正确性测试来说,一切看起来都很好,但是当达到更高的负载和阻塞参与者的超时时,服务失败了。
到目前为止的方法
- 不测试(这实际上是导致问题的原因)
- 通过获取响应的参与者路径进行测试(这似乎不必要地令人费解)
- 设定一个性能目标,不要测试实现,而是检查目标是否成立。即:对于每秒 100 个请求,其中 10% 的请求超时保证响应延迟。
java - akka-streams + akka 演员:mapAsyncUnordered + 询问模式问题
我一直在使用 akka-actors 和 akka-cluster 一段时间。最近有个需求给代码加背压,用akka-streams原生模式mapAsyncUnordered + ask(消息顺序无所谓)来解决这个问题。
问题类似于探索一棵树。
如果树很小,这似乎工作正常。但是,如果树的大小增加,代码似乎会卡住并且无法运行。树可能有 1 亿个节点甚至更多。
有人可以帮助我如何使其工作或提出更好的方法。
scala - 通过持久化 id 查找演员
我有一个系统,每个用户都有一个演员。用户很少发送消息,但当他们发送消息时,他们通常不仅发送一条,而且发送很少。
目前,我有一张地图,我存储persistenceId -> ActorRef
. 当我收到一个演员的新消息时,我会查看地图,如果有 ActorRef,我会使用它。如果它丢失了,我会创建它并将其放入地图中。当然,我不想同时拥有相同持久性参与者的 2 个实例。另外,我不想为每条消息创建和销毁actor,因为恢复可能需要一些时间。
我觉得应该有一些更简洁的方式来“定位或创造”一个演员。类似的东西actorSystem.getOrCreate(persistenceId, props)
。我认为分片可能会帮助我,但我找不到一个确切的例子。另外,我知道有actorSelection
,它有缺点:
- 在太多地方使用它,硬编码的路径很难维护
- 使用它来发送太多消息,因为它有性能成本
所以基本上问题是如果我的演员persistenceId是userId,那么在一项服务中定位持久演员的最佳方法是什么。如果我决定使用分片,那么每个演员 1 个分片。这个可以吗?
akka - 用于 http 请求 Java 的 Akka 演员
您好我正在尝试在 AKKA - Java 中寻找一个简单的示例来创建一个 HTTP 客户端和一个 Actor。到目前为止,我能够创建一个请求并获得响应 Http Entity。我需要将它迁移到一个演员,所以我可以在超时的情况下并行调用多个演员。
scala - Akka:在邮箱中保留不匹配的消息
我熟悉 Erlang/Elixir,其中进程邮箱中的消息保留在邮箱中,直到它们匹配:
这些模式
Pattern
按时间顺序与邮箱中的第一条消息顺序匹配,然后是第二条,依此类推。如果匹配成功并且可选的保护序列GuardSeq
为真,Body
则评估对应的。匹配的消息被消费,即从邮箱中删除,而邮箱中的任何其他消息保持不变。
(http://erlang.org/doc/reference_manual/expressions.html#receive)
但是,对于 Akka Actor,不匹配的消息会从邮箱中删除。例如,在哲学家就餐模拟中实现分叉时,这很烦人:
当一条Take(<philosopher>)
消息被发送到分叉时,我们希望消息一直留在邮箱中,直到分叉被释放并且消息被匹配。但是,在 AkkaTake(<philosopher>)
中,如果当前采用分叉,则从邮箱中删除消息,因为没有匹配项。
目前,我通过重写unhandled
Fork actor 的方法并将消息再次转发到 fork 来解决这个问题:
我相信这是非常低效的,因为它会一直将消息发送到分叉,直到匹配为止。有没有另一种方法来解决这个问题,不涉及不断转发不匹配的消息?
我相信在最坏的情况下,我将不得不实现一个模仿 Erlang 邮箱的自定义邮箱类型,如下所述:http: //ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html
编辑:我根据 Tim 的建议修改了我的实现,并按照建议使用了 Stash 特征。我的Fork
演员现在看起来如下:
但是,我不想到处写stash()
andunstashAll()
调用。相反,我想实现一个自定义邮箱类型来为我执行此操作,即存储未处理的消息并在参与者处理消息时取消存储它们。这可能吗?
我尝试实现一个自定义邮箱来执行此操作,但是,我无法确定消息是否与接收块匹配。
akka-http - 如何通过 Akka HTTP (Java) 与 Akka Actor 交互
话题
我想通过 Akka HTTP 与 Akka Actor 交互。这个想法是有一个系统,其中 HTTP 客户端调用 Akka HTTP 服务器方法,该方法处理对 Akka Actor 的请求。参与者处理消息并响应调用者(Akka HTTP),调用者回答 HTTP 客户端。我设法按照上面的描述做了,但我认为我做的不对,因为我的实现似乎是阻塞的。
我解释得更好:如果我发出许多并发 HTTP 请求,我会看到 Akka HTTP“创建一个队列”,因此在发送以下请求之前等待参与者处理请求。
相反,我想获得的是Akka HTTP 服务器将来自 HTTP 客户端的请求立即转发到目标 akka 演员,而无需等待演员结束阐述。 我想使用actor邮箱容量参数来确定消息队列有多大,如果消息太多则拒绝消息。
因此,我需要一种方法让 Akka HTTP 异步等待参与者响应。
我知道邮箱容量工作正常,因为如果我改为使用简单的actor2.tell("Prova1", system.deadLetters())向我的演员发出许多请求(仅用于测试),超过邮箱大小的请求是正确的被拒绝。
参考
为了测试我的系统,我按照 akka 文档提供的最小示例创建了一个简单的配置。这对于 akka http: https ://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example
以及用于创建我的演员的以下内容: https ://doc.akka.io/docs/akka/current/actors.html#creating-actors
我的代码
我做的第一件事是创建一个带有一个actor(actor1)的系统,配置akka HTTP如下:
我的 ActorTest 如下:
我的 application.conf 非常简单:
预期成绩
如您所见,如果邮箱容量 = 1,我希望,如果我发出超过 1 个并发请求,则只有一个被处理,其余的被丢弃。
我认为上面的代码对于我想要获得的内容不正确,因为我使用 Akka HTTP 路由来接收http://127.0.0.1/mysuburl/actor1/my_msg上的 HTTP 请求,然后使用收件箱发送消息到演员并等待回应。
所以我的问题是:以异步方式将我的 Akka HTTP 请求链接到我的 Akka Actor actor 1 的正确方法是什么?
如果您需要更多详细信息,请告诉我。
笔记
我什至阅读了以下文章: https ://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html
它解释了如何创建有限数量的线程以处理多个阻塞请求,但我认为这只会“减轻”我的代码的影响,它是阻塞的,但必须以不阻塞的方式编写。