问题标签 [akka-persistence]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
420 浏览

scala - 用于 inmem 日志插件的 Akka readJournalPluginId

我应该将哪个readJournalPluginId与 inmem 日记插件一起使用?

0 投票
0 回答
408 浏览

postgresql - Slick+HikariCP 在大量流媒体使用期间连接不足

我在应用程序的某些部分为 Akka Persistence使用akka-persistence-jdbc插件,并在其他部分直接使用。在将最热门的部分从直接光滑使用迁移到 akka-persistence HikariCP 后,HikariCP 开始抛出异常:

这不是慢速 SQL 查询的问题。解释分析表明 PostgreSQL 在大约 1 毫秒内执行它们。但是连接会等待一些东西并且不会被释放几分钟。更多细节在这里

在此处输入图像描述

akka-persistence-jdbc使用流进行写入和读取,这可能是一个狡猾的错误还是akka-persistence-jdbc 此类操作的方式有问题?

我正在使用 slick 3.1.1、HikariCP 2.3.7、PostgreSQL 9.4 和max_connections: 120.

0 投票
2 回答
912 浏览

akka - 使用 Akka 进行事件溯源时如何处理 CQRS 中的 Q?

与 Event Sourcing 结合使用时,是否有一种很好的 CQRS 方法?

我想到的一种方法是在命令处理程序(持久演员的)中执行此操作,一旦命令变成事件并持久保存到事件日志(这些事件代表写入模型),我将使用发送事件事件总线到感兴趣的订阅查询参与者,以便他们可以更新他们的查询模型。

我在想的另一种方式(如果日志支持它)是使用持久性查询(通过 Akka Streams),例如allPersistenceIdsorcurrentPersistenceIds并且查询端(可能的查询参与者)可以定期执行此操作。

我在正确的道路上吗?有更好的方法吗?

0 投票
1 回答
373 浏览

scala - 验证 Akka Persistence 命令的最佳实践

假设我们有一个命令AddUser

在持久化事件之前验证此命令的最佳方法是什么?例如,以这样的方式进行操作似乎有点混乱。

0 投票
0 回答
124 浏览

scala - Akka - 询问模式 - 组合结果

我正在设计一个演员系统。centerActor 需要等待 actor1 和 actor2 的结果才能响应客户端。

在中心演员:

在这种设计中,我的发件人(客户端)无法接收Done消息。我不知道为什么。所以我在考虑第二种设计:

0 投票
1 回答
542 浏览

cassandra - Akka Persistence Cassandra:时间戳!= 偏移量。或者如何只处理一次事件?

我目前正在使用 Akka Persistence 和 Cassandra Journal 插件构建一个事件源应用程序。我有一些视图需要捕获多个持久性 ID 的事件,因此我使用 eventsByTag 查询来更新(例如)我的 mongodb 视图。

如果我重新启动我的应用程序,查询会被重播,所以我需要以某种方式存储视图的状态,所以它不会重播已经处理过的事件。

起初我计划使用最后处理的事件的偏移量,因为 Cassandra 插件在内部使用 TimeUUID,它应该是唯一的。这里的问题是,Akka Journal 仅支持 Long 值作为偏移量,因此 TimeUUID 被转换回正常的时间戳。

所以例如:

2d2504b1-31f8-11e6-af83-9f34c8060f40 和 2d2504b2-31f8-11e6-af83-9f34c8060f40

两者都会导致相同的偏移量,如果我在同一毫秒内有多个事件,这对我来说在确定最后处理的事件方面毫无用处。

有没有人知道如何以更好的方式解决这个问题?

编辑


CassandraReadJournal提供了 getEventsByTag 流的重载版本,它返回 UUIDEventEnvelopes 。这包含作为 UUID 而不是 Long 的偏移量。

0 投票
1 回答
1240 浏览

akka - Akka 删除持久消息

我正在编写一个应用程序来处理设备引发的事件(每小时 100 万)。一些事件将被聚合(并且具有很长的时间跨度(例如 48 小时)),其中包含开始事件、状态(x 次)事件和结束事件。其他是可以立即处理的单个事件。为了至少保证一次事件将被处理,我正在研究 akka-persistence。应用程序的其他部分已经使用 akka 和 kafka。

我的目标解决方案应该包含一个持久映射,可以通过它们的 eventId 轻松地从中选择事件。顺序不太重要。处理完事件后,可以将其从地图中移除(并且不应再保留)。

在找到的文档/示例中,我找到了满足每个事件清除要求的队列示例,但在简单查找方面遇到了困难(必须循环队列才能找到事件)。为了满足我想使用地图的简单查找,使用 PersistentActor 特征和下面的一些数据库。然而,事件由序列号清除(这将删除需要更多处理/正在等待其他事件发生的事件)。调查的另一个特征是 AtLeastOnceDelivery,其交付确认满足要求,但在处理完所有事件之前,此特征会阻止恢复。

关于如何在 Akka 中为事件实现持久性篮子的任何想法?(我正在使用 scala 顺便说一句)

0 投票
1 回答
661 浏览

akka - Akka Persistence Query 和 actor 分片

我正在做 CQRS Akka 演员应用程序的查询端。

查询参与者设置为集群分片,并填充来自一个持久性查询流的事件。

我的问题是:

  1. 如果集群分片中的参与者之一重新启动如何恢复它?

    • 关闭整个集群分片并回复所有事件?
    • 使集群分片中的参与者持久化参与者并仅为查询端保存一组新事件?
  2. 如果使用 Persistence Query 填充的 actor 重新启动,我如何取消当前的 PQ 并重新启动它?

0 投票
2 回答
3342 浏览

scala - Akka Persistence Query 事件流和 CQRS

我正在尝试在我的 ES-CQRS 架构中实现读取端。假设我有一个这样的持久演员:

据我了解,每次事件持续存在时,我们都可以通过Akka Persistence Query. 现在,我不确定订阅这些事件的正确方法是什么,以便我可以将它保存在我的读取端数据库中?其中一个想法是最初UsersStream从我的读取方actor向UserWriteactor发送消息,并在该读取actor中“接收”事件。

编辑

根据@cmbaxter 的建议,我以这种方式实现了读取端:

有一些问题,例如: 事件流似乎很慢。即UserRead演员可以在保存新添加的用户之前用一组用户回答。

编辑 2

我增加了 cassandra 查询日志的刷新间隔,这更多地解决了慢事件流的问题。默认情况下,Cassandra 事件日志似乎每 3 秒轮询一次。在我的application.conf我补充说:

编辑 3

实际上,不要减少刷新间隔。这会增加内存使用量,但这并不危险,也不是一点。CQRS 的一般概念是写入端和读取端是异步的。因此,在您写入数据后将永远无法立即读取。处理用户界面?在读取端确认它们之后,我只需打开流并通过服务器发送的事件推送数据。

0 投票
1 回答
196 浏览

scala - Akka 集群分片不起作用

我在 Scala 2.11.8 上使用 akka 2.4.7。

事件即将到达 extractShardId 和 extractEntityId 但它们没有被传播到参与者的接收方法。有什么想法吗?

https://bitbucket.org/kuzelac/apt-billing/overview分支价格分片

已经定义了一个集群分片:

演员对象是

我已将 conf 设置为

演员正在被持久性查询播种