0

我有一个PersistentActor事件persist,我想以相反的顺序阅读它们。更具体地说,我想Source为以相反顺序发出事件的事件获取一个。这对于回答有关为此发生的最后 N 个事件的查询很有用PersistentActor。这个问题的正确方法是什么?这些信息应该缓存在视图中,或者有一种方法可以让我以相反的顺序懒惰地查询事件?

谢谢!

4

1 回答 1

1

如果您的最终目标是“回答有关最后 N 个事件的查询”,那么您可以使用循环缓冲区编写扫描流程:

import scala.collection.immutable

type CircularBuffer[T] = immutable.Vector[T]

def emptyCircularBuffer[T] : CircularBuffer[T] = immutable.Vector.empty[T]

def addToCircularBuffer[T](maxSize : Int)(buffer : CircularBuffer[T], item : T) : CircularBuffer[T]  =
  if(maxSize > 0)
    buffer.drop(buffer.size - maxSize + 1) :+ item
  else 
    buffer

import akka.stream.scaladsl.Flow

def circularBufferFlow[T](N : Int) = 
  Flow[T].scan(emptyCircleBuffer[T])(addToCircleBuffer[T](N))

此 Flow 将容纳 0 到 N 个元素,并在每次更新时发出一个新的 Buffer:

Source(1 to 10).via(circularBufferFlow[Int](3))
               .runWith(Sink.foreach(println))

//Vector()
//Vector(1)
//Vector(1, 2)
//Vector(1, 2, 3)
//Vector(2, 3, 4)
//Vector(3, 4, 5)
//...

要专门获取最后一个缓冲区,您可以使用Sink.last将整个流程具体化为Future.

但是,如果您的目标是真正反转 Stream,那么我会认为这是不明智的,因为 Stream 可能会无限持续下去,这将导致您的缓存最终消耗所有 JVM 内存并使应用程序崩溃。

于 2016-01-15T19:01:03.427 回答