3

我一直在研究实验性的 Akka Persistence Query 模块,并且对为我的应用程序实现自定义读取日志非常感兴趣。该文档描述了两种主要的查询方式,一种返回日志的当前状态(例如CurrentPersistenceIdsQuery),另一种返回可订阅的流,当事件通过应用程序的写入端提交到日志时发出事件(例如AllPersistenceIdsQuery

对于我设计的应用程序,我使用 Postgres 和 Slick 3.1.1 来驱动这些查询。我可以通过执行以下操作成功地流式传输数据库查询结果:

override def allPersistenceIds = {
  val db = Database.forConfig("postgres")
  val metadata = TableQuery[Metadata]

  val query = for (m <- metadata) yield m.persistenceId
  Source.fromPublisher(db.stream(query.result))
}

但是,一旦底层 Slick DB 操作完成,就会通知流完成。这似乎无法满足能够发出新事件的永久开放流的要求。

我的问题是:

  • 有没有办法纯粹使用 Akka Streams DSL 来做到这一点?也就是说,我可以发送一个无法关闭的流吗?
  • 我已经对 LevelDB 读取日志的工作方式进行了一些探索,并且它似乎通过让读取日志订阅写入日志来处理新事件。这似乎是合理的,但我必须问 - 一般来说,有没有推荐的方法来处理这个要求?
  • 我考虑过的另一种方法是轮询(例如,定期让我的阅读日志查询数据库并检查新事件/ID)。比我更有经验的人能提供一些建议吗?

谢谢!

4

2 回答 2

4

它不像这一行代码那么微不足道,但是您已经是正确的轨道之一。

为了实现“无限”流,您需要多次查询——即实现轮询,除非底层数据库允许无限查询(这里它不支持 AFAICS)。

轮询需要跟踪“偏移量”,因此如果您通过某个标签进行查询,并且您发出另一个轮询,您需要从“最后一个发出的元素”开始该(第二次)查询,而不是开始再次的表。所以你需要某个地方,很可能是一个 Actor,来保持这个偏移量。

Query Side LevelDB 插件并不是其他实现的最佳角色模型,因为它对底层日志及其工作方式做了很多假设。此外,LevelDB 不适合使用 Akka Persistence 进行生产——它是我们发布的一个日志,目的是为了拥有一个可以开箱即用的持久日志(无需启动 Cassandra 等)。

如果您正在寻找灵感,MongoDB 插件实际上应该是一个很好的来源,因为它们具有与 SQL 存储非常相似的限制。我不确定当前是否有任何 SQL 日志实现了查询端。

于 2016-01-07T01:06:10.317 回答
0

可以使用Postgres 复制 API来获取“无限”的数据库事件流。从版本 42.0.0 开始的 Postgres JDBC 驱动程序支持它,请参阅相关的拉取请求。但是,它不是真正的流,而是来自数据库 WAL 的缓冲同步读取器。

PGReplicationStream stream =
    pgConnection
        .replicationStream()
        .logical()
        .withSlotName("test_decoding")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .start();
while (true) {
  ByteBuffer buffer = stream.read();
  //process logical changes
}

对于这个读者来说,在alpakka 项目中有一个 Akka Streams 适配器(源代码)会很好。

于 2017-03-03T22:39:10.427 回答