我有一个PersistentActor
事件persist
,我想以相反的顺序阅读它们。更具体地说,我想Source
为以相反顺序发出事件的事件获取一个。这对于回答有关为此发生的最后 N 个事件的查询很有用PersistentActor
。这个问题的正确方法是什么?这些信息应该缓存在视图中,或者有一种方法可以让我以相反的顺序懒惰地查询事件?
谢谢!
我有一个PersistentActor
事件persist
,我想以相反的顺序阅读它们。更具体地说,我想Source
为以相反顺序发出事件的事件获取一个。这对于回答有关为此发生的最后 N 个事件的查询很有用PersistentActor
。这个问题的正确方法是什么?这些信息应该缓存在视图中,或者有一种方法可以让我以相反的顺序懒惰地查询事件?
谢谢!
如果您的最终目标是“回答有关最后 N 个事件的查询”,那么您可以使用循环缓冲区编写扫描流程:
import scala.collection.immutable
type CircularBuffer[T] = immutable.Vector[T]
def emptyCircularBuffer[T] : CircularBuffer[T] = immutable.Vector.empty[T]
def addToCircularBuffer[T](maxSize : Int)(buffer : CircularBuffer[T], item : T) : CircularBuffer[T] =
if(maxSize > 0)
buffer.drop(buffer.size - maxSize + 1) :+ item
else
buffer
import akka.stream.scaladsl.Flow
def circularBufferFlow[T](N : Int) =
Flow[T].scan(emptyCircleBuffer[T])(addToCircleBuffer[T](N))
此 Flow 将容纳 0 到 N 个元素,并在每次更新时发出一个新的 Buffer:
Source(1 to 10).via(circularBufferFlow[Int](3))
.runWith(Sink.foreach(println))
//Vector()
//Vector(1)
//Vector(1, 2)
//Vector(1, 2, 3)
//Vector(2, 3, 4)
//Vector(3, 4, 5)
//...
要专门获取最后一个缓冲区,您可以使用Sink.last
将整个流程具体化为Future
.
但是,如果您的目标是真正反转 Stream,那么我会认为这是不明智的,因为 Stream 可能会无限持续下去,这将导致您的缓存最终消耗所有 JVM 内存并使应用程序崩溃。