0

我正在使用 akka-grpc 生成客户端绑定。它们通常具有以下形式

func[A, B](in: Source[A]) : Source[B],

即他们消费 aSource[A]并提供 a Source[B]

现在,我想将func它们Flow[A, B]与 akka-stream 一起使用。

4

1 回答 1

0

解决方案是:

  def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] = 
    Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }

prefixAndTail用来劫持underyling Source

于 2019-03-22T11:34:07.490 回答