1

我正在使用 akka-persistence 的 PersistenceQuery 将初始状态加载到管理内容的演员。我希望它在启动时只重播一次,但它会不断将它们发送到日志中。

14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]

这是我为实现它而编写的程序。

implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
      LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
  case x: RoomCreated => process(x)
  case x: RoomDeleted => process(x)
  case x => logger.error(s"Could not spawn $x")
})
4

1 回答 1

1

我认为您的预期行为与您实际看到的不同之处在于这eventsByPersistenceId是一个“实时”流。这意味着它不仅会返回从您提供的偏移范围内开始的事件(您从 0 开始并转到 Long.MaxValue,所以一切),但如果新事件进入,它将继续向您发送。如果您不这样做不想要直播,然后将呼叫currentEventsByPersistenceId改为。这将仅包括截至该时间点(您提出请求的时间)的内容,而不是实时流。这应该是你正在寻找的。

于 2016-07-25T16:26:36.790 回答