4

与 Event Sourcing 结合使用时,是否有一种很好的 CQRS 方法?

我想到的一种方法是在命令处理程序(持久演员的)中执行此操作,一旦命令变成事件并持久保存到事件日志(这些事件代表写入模型),我将使用发送事件事件总线到感兴趣的订阅查询参与者,以便他们可以更新他们的查询模型。

我在想的另一种方式(如果日志支持它)是使用持久性查询(通过 Akka Streams),例如allPersistenceIdsorcurrentPersistenceIds并且查询端(可能的查询参与者)可以定期执行此操作。

我在正确的道路上吗?有更好的方法吗?

4

2 回答 2

3

我认为您首先提到的方法将毫无问题地起作用。对于第二个,我不会这么说。如果我对您的理解正确,您希望查询投影以获取更新,而不是通过事件总线推送。我认为这里的问题是您必须区分已经处理的事件和新的事件(更新)。我不确定 Akka EventStore 日志是否可以处理这个问题,但我对此表示怀疑。

于 2016-04-29T09:35:25.347 回答
2

在做了一些研究并承认在处理命令(上面的第一种方法)期间更新读取端可能会失败的事实,这将涉及处理回滚和事务。更好的方法是使用持久性查询。这在很大程度上依赖于您的事件日志支持 Persistence Query 的不同功能(如AllPersistenceIdsQueryEventsByTag),这些功能目前由 Cassandra、Event Store 和其他一些日志支持。

这个想法是将命令端与查询端分离,其中命令端根本不需要知道有任何查询端。这种解耦是使用Persistence Query提供给我们的。这个想法是命令端应该只关心验证传入的命令并将事件持久化到事件日志中。就是这样,没有查询端的意识。

现在对于 Query Side(s),您使用 Persistence Query like EventsByTagor从日志中AllPersistenceIdsQuery获取 a ,这是来自 Event Journal 的 s的背压实时流,并且您使用此抽象来提供您的 Read Sides。你可以在这里找到方法Source[Event]Event

让我们想想失败

如果指挥方宕机了怎么办?

没有 new Events 将被持久化到 Event Journal,Read Side Persistence Queries 将愉快地Event在流中产生 no new s。当命令端恢复时,一切都将恢复,现在读取端持久性查询将Event在流中看到新的 s。

如果其中一个读取端出现故障怎么办?

不是什么大问题,你会擦除读取端数据库并从头开始重新启动持久性查询,然后我们就走了。我们绝对可以通过使用Resumable Projections的概念来改进这一点。这个想法是继续存储Event您读取的每个当前偏移量,以便如果您的读取端出现故障,我们可以简单地从我们正在读取的点恢复,而不必从头开始。一个忠告,如果您需要一次有效的交付,那么您可能需要考虑使用幂等过滤器来避免重复。如果你这样做,你可以做一些优化,你不需要保留每个 ID,而只是在特定的时间间隔。

其他一些注意事项

如果您需要引入新的读取端并且它们从一开始就依赖于命令端事件怎么办?

这种解耦方法允许您执行此操作。这个想法是您不要从事件存储中删除事件,只需启动另一个持久性查询并让它为您的新读取端提供数据,如果您需要保持最新状态,请不要忘记使用可恢复投影。

这些方法隐含地具有持久性查询和在同一系统上提供读取端和可恢复投影的逻辑。

另一种方法是将 Persistence Query 与 Akka HTTP 结合起来,并公开一个流式端点,该端点公开您的事件日志,并允许您从一开始或从某个偏移量获取所有事件/某些事件。这种方式允许你做一些进一步的解耦,但如果你使用这种方法,你真的希望有 Resumable Projections,因为现在由于引入了 HTTP 而增加了失败。现在,您的 Read Sides 可以使用此流式传输端点并在其一侧拥有 Resumable Projection,并且可以以更加解耦的方式引入新的 Read Sides。

这些只是我开始使用 Akka 以来收集到的一些知识。也许一些更有经验的人对 CQRS 有更好的方法。

如果您正在寻找代码示例,我强烈建议您查看 Christian Baxter 的 Mastering Akka,它更详细地描述了该方法以及这篇文章

谢谢阅读

于 2017-03-22T02:58:24.490 回答