2

Alpakka 提供了一种访问数十种不同数据源的好方法。面向文件的源(例如 HDFS 和 FTP 源)以Source[ByteString, Future[IOResult]. 但是,通过 Akka HTTP 的 HTTP 请求是作为Source[ByteString, NotUsed]. 在我的用例中,我想从 HTTP 源中检索内容,Source[ByteString, Future[IOResult]这样我就可以构建一个统一的资源获取器,它适用于多种方案(在本例中为 hdfs、文件、ftp 和 S3)。

特别是,我想将Source[ByteString, NotUsed]源 转换为Source[ByteString, Future[IOResult]能够从传入字节流计算 IOResult 的位置。有很多类似的方法flatMapConcatviaMat但似乎没有一个能够从输入流中提取细节(例如读取的字节数)或IOResult正确初始化结构。理想情况下,我正在寻找一种具有以下签名的方法,该方法将在流进入时更新 IOResult。

  def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
    src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
  }
4

2 回答 2

1

我不记得任何现有的功能,可以开箱即用,但你可以使用也ToMat(令人惊讶的是在akka流文档中没有找到它,虽然你可以在源代码文档和java api中查看它)流功能与Sink.fold一起积累一些价值并最终给予它。例如:

def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
    source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)

问题是alsoToMat将输入 mat 值与alsoToMat. 同时 source 产生的值不受 sink 的影响alsoToMat

def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
  viaMat(alsoToGraph(that))(matF)

把这个函数改成 return 并不难,IOResult根据源码:

final case class IOResult(count: Long, status: Try[Done]) { ... }

你需要注意的最后一件事 - 你希望你的来源像:

Source[ByteString, Future[IOResult]]

但是如果你不想在流定义的最后时刻携带这些 mat 值,然后根据这个未来的完成来做 smth,那可能是容易出错的方法。例如,在本例中,我根据该未来完成工作,因此不会处理最后一个值:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object App extends App {

  private implicit val sys: ActorSystem = ActorSystem()
  private implicit val mat: ActorMaterializer = ActorMaterializer()
  private implicit val ec: ExecutionContext = sys.dispatcher

  val source: Source[Int, Any] = Source((1 to 5).toList)

  def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
    source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)

  val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
  f.onComplete(t => println(s"f1 completed - $t"))
  Await.ready(f, 5.minutes)


  mat.shutdown()
  sys.terminate()
}
于 2019-01-22T23:47:35.687 回答
1

这可以通过Promise对物化值传播使用 a 来完成。

val completion = Promise[IoResult]
val httpWithIoResult = http.mapMaterializedValue(_ => completion.future)

现在剩下的就是completion在相关数据可用时完成承诺。

另一种方法是下拉到GraphStageAPI,您可以在其中对物化值传播进行较低级别的控制。但即使有使用Promises,通常也是物化价值传播的选择实现。看看内置的运算符实现,例如Ignore.

于 2019-01-21T15:58:36.543 回答