问题标签 [akka-persistence]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - Akka Cluster、Cluster Sharding 和 Cluster 单例用例
我正在编写一个 Akka 应用程序,我想使用 Typsafe 提供的 Akka Cluster 功能。我试图了解 Akka Cluster、Akka Cluster Singleton 和 Akka Cluster Sharding 之间的主要区别。对我来说,他们似乎提供了类似的东西。
在我的场景中,我想要一个集群,它将具有:
- 可以存在于任何机器中的无状态actors(我将拥有一个池)
- 我希望他们生活在特定机器中的无国籍演员
- 可以存在于任何机器中的有状态参与者,但我只需要它们在整个集群中的一个实例,并且能够在参与者死亡时以之前的状态重新启动参与者。
根据我所阅读的内容,我需要 Akka Cluster 作为第一种,Akka Cluster Singleton 用于第二种,Akka Cluster Sharding 用于第三种。我的假设正确吗?如果您知道的话,我正在寻找一些指导以及任何在线示例。
cqrs - Generating commands when events happen in Akka Persistence
I am implementing an EventSourcing application that handles a large number of original and derived data points. In short, we have an PersistentActor
functioning as an Aggregate Root accepting commands:
After these commands are verified, they produce events which are persisted and update the state:
In a PersistentView
we listen to these events and calculate derived values:
But this recalculation itself is a new value on which other views could depend, so that means that we have to send a new command back to the aggregate root to process the new value, which can produce events that can be picked up by this or other views.
Is it acceptable for a view to produce events or commands? I would think a view's responsibility is to update a state based on events, not produce events or commands. Also, the order in which events arrive can influence the new events being broadcast during replay.
Is it necessary to produce commands instead of events? Since the command was updating the initial value, you could argue that producing all the derived values are simply events that are the result of the command being processed, although they are generated in a distributed fashion and not directly by the aggregate root.
I've been looking at Akka's Reactive Streams that could be used to string these actors together, and also looked at the idea of Sagas as presented here: http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-i-of-ii/. In that post Jonathan mentions:
Sagas listen to events and dispatch commands while aggregates receive commands and publish events.
That seems like a sensible approach as well, to implement all these actors as FSMs: wait 5 seconds for related events, recalculate everything, dispatch command, wait 5 seconds for events, etc.
To make things a little bit more interesting, the streams of values can be out of order and incomplete, but should produce derived values at points in time. So if I receive values A & B:
- A1, B1, B2, A2, B3, A4, B4
it should produce derived values D:
- D1 (A1 * B1), D2 (B2 * A2), D3 (B3 * A2, there is no A3), D4 (A4 * B4)
This means I have to keep track of order, and sometimes reissue a derived value if a missing value comes in.
Thanks!
java - 无法序列化从泛型特征派生的泛型案例类
我正在使用以下代码:
使用 Akka Persistence 持久化这些事件并重构 actor 的状态。但是,在致电persist(ev)
时,我收到以下异常:
似乎 Java 序列化程序无法处理派生的通用案例类?我已经尝试过sealed trait Event extends Serializable
,但这并没有太大帮助。
无论如何有解决方案吗?也许其他序列化程序?还是另一种制作一组通用事件/命令/方法的方法?
java - 对 Akka UntypedPersistentActor 的同步(或未来返回)调用?
我想通知调用者成功应用了发送到我的对象的状态更改命令,该对象扩展了 UntypedPersistentActor。
如果我不坚持状态更改,我会返回一个带有影响更改的闭包的 Future,并让调用者等待。
如果我理解正确的话,Akka 的“persist()”调用会强制我传入一个将异步执行的闭包。如果我返回一个本身调用persist() 的Future,我只是告诉调用者我已经成功地将更改排队等待以后的应用程序......而不是它已被应用,甚至它的应用程序将成功。
我想关于一个调用者的保证排序有点满足知道何时应用更改的需要,但是如果更改失败怎么办?如果被调用者因为失败而重新启动,调用者的消息将被丢弃,状态改变永远不会发生,调用者将不知情。将错误返回给调用者并在那里显示它似乎会更干净。
有没有好的方法来实现这一目标?
java - 无法将 Spring 服务注入 akka 服务
我有 Spring 服务,它实际上是演员,它接收到信息,但我不能将它传递给另一个 Spring 服务,因为注入失败。
和第二次服务
Spring 配置,这是来自 akka spring 示例
那么,我如何才能注入另一个 Spring 服务?????????
scala - akka-persistence中persistenceId的唯一性
我正在使用用于 akka-persistence 的 scala api 来持久化一组被组织成树的参与者实例。树中的每个节点都是一个持久的参与者,并根据从“根”节点到该节点的路径来命名。persistenceId 设置为名称。例如,根节点 actor 具有 persistenceId 'root'。下一个节点有 persistenceId 'root-europe'。另一个参与者可能有 persistenceId 'root-europe-italy'。
每个参与者的状态包括其孩子的姓名列表。例如,“根”参与者维护一个“欧洲”、“亚洲”等列表作为其状态的一部分。
我已经为这个系统实现了快照。当根被触发快照时,它会这样做,然后告诉每个孩子也这样做。
问题出现在快照恢复期间。当我重新创建一个具有 persistenceId = 'root' 的演员(通过将名称作为构造函数参数传递)时,该演员收到的 SnapshotOffer 事件是错误的。例如,它是“root-europe-italy....”。这似乎与持久性合同相矛盾,其中persistenceId 标识要恢复的参与者状态。我通过反转节点参与者的persistenceId(例如'italy-europe-root')解决了这个问题,所以这似乎与持久性模块检索文件的方式有关。请注意,我首先尝试了其他方法,例如我在节点名称之间使用了各种分隔符,或者根本没有分隔符。
有没有其他人遇到过这个问题,或者 akka-persistence 开发人员可以帮助我理解为什么会发生这种情况?
顺便说一句:我现在正在使用内置的基于文件的快照存储。
谢谢。
scala - 克服 Akka Persistence 中对持久消息类的更改
假设我从这样的 Akka Persistence 系统开始:
然后有一天我把它改成这样:
在我部署新系统后,当实例MyProcessor
尝试恢复其状态时,日志消息将属于前一个案例类。因为它期待后一种类型,它会抛出一个OnReplayFailure
,使处理器无用。问题是:如果我们假设缺席y
可以等于0
(或其他)是否有最佳实践来克服这个问题?例如,也许implicit
在恢复时使用 an 从前者消息转换为后者?
java - Akka持久化自定义java插件
我目前正在为Akka
SyncWriteJournal
API 编写自己的插件以实现与HSQLDB
.
问题是我不明白该方法的要求doAsyncReplayMessages
。它声明它需要返回一个未来,并且所有的消息都应该由replayCallback
.
假设我有一个返回消息列表的查询:List<Message> messages
。任何人都可以提供如何使用的最小示例(带有解释)replayCallback
,并Future
通过使用该列表正确实现该方法吗?将如何协同工作replayCallback
以及Future
该方法应该返回什么doAsyncReplayMessages
?
谢谢!
-编辑-
在一些评论的帮助下,我提出了一个不完整的实现,但包含了所提出的想法:
正如您可能看到的,它遗漏了一些我仍然遗漏的关键概念。PersistentImpl 需要一个仍然存在的Seq<String> confirm
参数null
。也许更重要的是我回来null
了,因为未来期望Void
作为返回类型,我不确定如何实现它。它目前抛出一个 NPE:
domain-driven-design - DDD/CQRS 查询事件
我正在查看有关在使用 Event Sourcing/DDD/CQRS 方法设计的应用程序中进行查询的帖子。
据我了解,事件是对域对象状态的更改。对状态的更改将作为历史记录/事件在数据库中进行维护(任何 sql/no sql)。
如果用户想要查询特定聚合根的当前状态,它将涉及获取事件历史记录。
当用户将查询特别是业务特定查询时,他/她将对当前状态而不是事件历史感兴趣。
CQRS 中的查询或“Q”部分如何与事件溯源一起使用?
考虑我有一个域对象“帐户”作为聚合根。账户 AR 将经历很多变化,即贷记借记。事件存储将有贷记和借记事件。
考虑用户需要获取帐户的当前余额,事件历史流将如何适应这里?用户将如何获取给定帐户的当前余额?
我无法理解,对于特定业务的事件查询历史记录将如何有用?
-Prakhyat MM
akka-persistence - Dispatcher 配置以获得更好的性能:Akka Persistence/DDD/CQRS
我正在开发一个使用 akka 持久性、akka 远程处理、akka 分片的应用程序。我们正在采用 DDD/CQRS 方法。
以下是应用程序配置,
我们正在为整个应用程序创建一个演员系统。如果我错了,请纠正我,默认情况下,默认调度程序使用具有至少 8 个线程的 fork-join 线程池。
我们需要支持来自客户的数百万个请求。使用akka持久化/分片/集群时如何调整actor系统的调度器,什么是推荐的调度器类型如何计算每个线程池的线程我们需要考虑核心吗?我们是否需要考虑操作系统的类型 JVM 配置是否重要 [注意:] 建议上面列表中的任何遗漏参数。还有如何在 app conf 中指定自定义调度程序配置。
基本上想了解标准来决定每个参与者系统执行调度程序。
是否需要任何其他配置来调整 akka 持久性/集群/分片类型的应用程序以获得良好的性能。
此外,调整 akka 持久性/集群/分片类型的应用程序的建议也会有很大帮助。
-Prakhyat MM