我一直在研究实验性的 Akka Persistence Query 模块,并且对为我的应用程序实现自定义读取日志非常感兴趣。该文档描述了两种主要的查询方式,一种返回日志的当前状态(例如CurrentPersistenceIdsQuery
),另一种返回可订阅的流,当事件通过应用程序的写入端提交到日志时发出事件(例如AllPersistenceIdsQuery
)
对于我设计的应用程序,我使用 Postgres 和 Slick 3.1.1 来驱动这些查询。我可以通过执行以下操作成功地流式传输数据库查询结果:
override def allPersistenceIds = {
val db = Database.forConfig("postgres")
val metadata = TableQuery[Metadata]
val query = for (m <- metadata) yield m.persistenceId
Source.fromPublisher(db.stream(query.result))
}
但是,一旦底层 Slick DB 操作完成,就会通知流完成。这似乎无法满足能够发出新事件的永久开放流的要求。
我的问题是:
- 有没有办法纯粹使用 Akka Streams DSL 来做到这一点?也就是说,我可以发送一个无法关闭的流吗?
- 我已经对 LevelDB 读取日志的工作方式进行了一些探索,并且它似乎通过让读取日志订阅写入日志来处理新事件。这似乎是合理的,但我必须问 - 一般来说,有没有推荐的方法来处理这个要求?
- 我考虑过的另一种方法是轮询(例如,定期让我的阅读日志查询数据库并检查新事件/ID)。比我更有经验的人能提供一些建议吗?
谢谢!