0

我有一个流(createDataPointFlow),它是通过执行一个mapAsync收集数据点(通过Sink.seq)构建的,否则我想直接流式传输(即不先收集)。

但是,对我来说,如何在不收集项目的情况下做到这一点并不明显,似乎我需要某种机制来将我的项目直接发布到我正在创建的流程的输出部分,但我是新手,不要不知道如何在没有明确参与者参与的情况下做到这一点,我想避免这种情况。

我怎样才能在不需要先将东西收集到 Sink 的情况下实现这一点?请记住,我想要实现的是完全流式传输,而无需进行显式缓冲Sink.seq(...)

object MyProcess {

  def createDataSource(job:Job, dao:DataService):Source[JobDataPoint,NotUsed] = {
    // Imagine the below call is equivalent to streaming a parameterized query using Slick
    val publisher: Publisher[JobDataPoint] = dao.streamData(Criteria(job.name, job.data))
    // Convert to a Source
    val src: Source[JobDataPoint, NotUsed] = Source.fromPublisher(publisher)
    src
  }

  def createDataPointFlow(dao:DataService, parallelism:Int=1): Flow[Job,JobDataPoint, NotUsed] =
    Flow[Job].mapAsync(parallelism)(job =>
      createDataSource(job,dao).toMat(Sink.seq)(Keep.right).run()
    ).mapConcat(identity)

  def apply(src:Source[Job,NotUsed], dao:DataService,parallelism:Int=5) = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    //Source
    val jobs:Outlet[Job] = builder.add(src).out
    //val bcastJobsSrc: Source[Job, NotUsed] = src.toMat(BroadcastHub.sink(256))(Keep.right).run()
    //val bcastOutlet:Outlet[Job] = builder.add(bcastJobsSrc).out

    //Flows
    val bcastJobs:UniformFanOutShape[Job,Job] = builder.add(Broadcast[Job](4))
    val rptMaker = builder.add(MyProcessors.flow(dao,parallelism))
    val dpFlow = createDataPointFlow(dao,parallelism)

    //Sinks
    val jobPrinter:Inlet[Job] = builder.add(Sink.foreach[Job](job=>println(s"[MyGraph] Received job: ${job.name} => $job"))).in
    val jobList:Inlet[Job] = builder.add(Sink.fold(List.empty[Job])((list,job:Job)=>job::list)).in
    val reporter: Inlet[ReportTable] = builder.add(Sink.foreach[ReportTable](r=>println(s"[Report]: $r"))).in

    val dpSink: Inlet[JobDataPoint] = builder.add(Sink.foreach[JobDataPoint](dp=>println(s"[DataPoint]: $dp"))).in

    jobs ~> bcastJobs

    bcastJobs ~> jobPrinter
    bcastJobs ~> jobList
    bcastJobs ~> rptMaker ~> reporter
    bcastJobs ~> dpFlow ~> dpSink
    ClosedShape
  })
}
4

1 回答 1

0

因此,在重新阅读有关可用的各个阶段的文档后,结果发现我需要的是flatMapConcat

def createDataPointFlow(dao:DataService, parallelism:Int=1): Flow[Job,JobDataPoint, NotUsed] =
    Flow[Job].flatMapConcat(createDataSource(_,dao))
于 2017-11-24T20:20:44.673 回答