我无法准确地生产您正在寻找的东西。但是我可以确定您正在寻找的未来,例如:
(Future(Some(0)), 0)
(Future(None) , 1)
(Future(None) , 2)
...
扩展您的示例,如果给定一个无法更改的流:
val flow = Flow[Int].filter(_ % 5 == 0)
然后可以在单个输入上评估此流程,并将结果转换为Option
:
import scala.concurrent.{Future, Promise}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Source,Sink}
def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = {
val fut : Future[Int] =
Source.single(in)
.via(flow)
.runWith(Sink.head) //Throws an Exception if filter fails
fut.map(Some(_)) // val => Some(val)
.fallbackTo(Promise(None)) // Exception => None
}
此函数返回一个Future[Option[Int]]
. 然后我们可以使用评估将结果与输入简单地结合起来:
def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
(evalFlow(in, flow), in)//(Future[Option[Int]], Int)
最后,该evalAndCombine
函数可以放在您的 Ints 源之后:
import akka.actor.ActorSystem
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher
val exampleSource = Source(() => (1 to 6).toIterator)
val tupleSource = exampleSource map evalAndCombine(flow)
同样,如果您想要 aFuture[(Option[Int], Int)]
而不是(Future[Option[Int]], Int)
,例如:
Future[(Some(0), 0)]
Future[(None , 1)]
...
然后稍微修改 combine 函数:
def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)]