3

我想要实现的是用 akka 流实现类似同步反馈循环的东西。

假设你有一个Flow[Int].filter(_ % 5 == 0). 当您将Int's 流广播到此流并直接在其后面压缩元组时,您会得到类似

(0,0)
(5,1)
(10,2)

有没有办法发出一个Option[Int],它指示在我推动下一个元素通过它之后流是否发出一个元素?

(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...

我想过实现我自己DetachedStage的右前后后Flow保持一个状态,每当流量拉到之前的舞台上时,我就知道他需要下一个元素。当后面的舞台没有收到元素时,它是None。

不幸的是,结果并不好,并且被许多职位所淘汰。

旁注

过滤器流只是一个例子,它可以是一个非常长的流,我无法提供Option在它的每个阶段发出的能力,所以我真的必须知道,流是否推动了下一个或没有而是从下游请求下一个

我也玩过conflateand expand,但这些结果的位置偏移更糟

我在配置中更改的一件事是流的initial缓冲区max,因此我可以确定指示的需求确实是在我推动它的元素之后。

很高兴获得有关如何解决此问题的一些建议!

4

1 回答 1

2

我无法准确地生产您正在寻找的东西。但是我可以确定您正在寻找的未来,例如:

(Future(Some(0)), 0)
(Future(None)   , 1)
(Future(None)   , 2)
...

扩展您的示例,如果给定一个无法更改的流:

val flow = Flow[Int].filter(_ % 5 == 0)

然后可以在单个输入上评估此流程,并将结果转换为Option

import scala.concurrent.{Future, Promise}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Source,Sink}

def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = {
  val fut : Future[Int] = 
    Source.single(in)
          .via(flow)
          .runWith(Sink.head) //Throws an Exception if filter fails

  fut.map(Some(_))              //       val => Some(val)
     .fallbackTo(Promise(None)) // Exception => None
} 

此函数返回一个Future[Option[Int]]. 然后我们可以使用评估将结果与输入简单地结合起来:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  (evalFlow(in, flow), in)//(Future[Option[Int]], Int)

最后,该evalAndCombine函数可以放在您的 Ints 源之后:

import akka.actor.ActorSystem

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

val exampleSource = Source(() => (1 to 6).toIterator)

val tupleSource = exampleSource map evalAndCombine(flow)

同样,如果您想要 aFuture[(Option[Int], Int)]而不是(Future[Option[Int]], Int),例如:

Future[(Some(0), 0)]
Future[(None   , 1)]
...

然后稍微修改 combine 函数:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)]
于 2015-11-26T16:45:04.477 回答