在做了一些研究并承认在处理命令(上面的第一种方法)期间更新读取端可能会失败的事实,这将涉及处理回滚和事务。更好的方法是使用持久性查询。这在很大程度上依赖于您的事件日志支持 Persistence Query 的不同功能(如AllPersistenceIdsQuery
和EventsByTag
),这些功能目前由 Cassandra、Event Store 和其他一些日志支持。
这个想法是将命令端与查询端分离,其中命令端根本不需要知道有任何查询端。这种解耦是使用Persistence Query提供给我们的。这个想法是命令端应该只关心验证传入的命令并将事件持久化到事件日志中。就是这样,没有查询端的意识。
现在对于 Query Side(s),您使用 Persistence Query like EventsByTag
or从日志中AllPersistenceIdsQuery
获取 a ,这是来自 Event Journal 的 s的背压实时流,并且您使用此抽象来提供您的 Read Sides。你可以在这里找到方法Source[Event]
Event
让我们想想失败
如果指挥方宕机了怎么办?
没有 new Event
s 将被持久化到 Event Journal,Read Side Persistence Queries 将愉快地Event
在流中产生 no new s。当命令端恢复时,一切都将恢复,现在读取端持久性查询将Event
在流中看到新的 s。
如果其中一个读取端出现故障怎么办?
不是什么大问题,你会擦除读取端数据库并从头开始重新启动持久性查询,然后我们就走了。我们绝对可以通过使用Resumable Projections的概念来改进这一点。这个想法是继续存储Event
您读取的每个当前偏移量,以便如果您的读取端出现故障,我们可以简单地从我们正在读取的点恢复,而不必从头开始。一个忠告,如果您需要一次有效的交付,那么您可能需要考虑使用幂等过滤器来避免重复。如果你这样做,你可以做一些优化,你不需要保留每个 ID,而只是在特定的时间间隔。
其他一些注意事项
如果您需要引入新的读取端并且它们从一开始就依赖于命令端事件怎么办?
这种解耦方法允许您执行此操作。这个想法是您不要从事件存储中删除事件,只需启动另一个持久性查询并让它为您的新读取端提供数据,如果您需要保持最新状态,请不要忘记使用可恢复投影。
这些方法隐含地具有持久性查询和在同一系统上提供读取端和可恢复投影的逻辑。
另一种方法是将 Persistence Query 与 Akka HTTP 结合起来,并公开一个流式端点,该端点公开您的事件日志,并允许您从一开始或从某个偏移量获取所有事件/某些事件。这种方式允许你做一些进一步的解耦,但如果你使用这种方法,你真的希望有 Resumable Projections,因为现在由于引入了 HTTP 而增加了失败。现在,您的 Read Sides 可以使用此流式传输端点并在其一侧拥有 Resumable Projection,并且可以以更加解耦的方式引入新的 Read Sides。
这些只是我开始使用 Akka 以来收集到的一些知识。也许一些更有经验的人对 CQRS 有更好的方法。
如果您正在寻找代码示例,我强烈建议您查看 Christian Baxter 的 Mastering Akka,它更详细地描述了该方法以及这篇文章。
谢谢阅读