我有一个流(createDataPointFlow),它是通过执行一个mapAsync
收集数据点(通过Sink.seq
)构建的,否则我想直接流式传输(即不先收集)。
但是,对我来说,如何在不收集项目的情况下做到这一点并不明显,似乎我需要某种机制来将我的项目直接发布到我正在创建的流程的输出部分,但我是新手,不要不知道如何在没有明确参与者参与的情况下做到这一点,我想避免这种情况。
我怎样才能在不需要先将东西收集到 Sink 的情况下实现这一点?请记住,我想要实现的是完全流式传输,而无需进行显式缓冲Sink.seq(...)
。
object MyProcess {
def createDataSource(job:Job, dao:DataService):Source[JobDataPoint,NotUsed] = {
// Imagine the below call is equivalent to streaming a parameterized query using Slick
val publisher: Publisher[JobDataPoint] = dao.streamData(Criteria(job.name, job.data))
// Convert to a Source
val src: Source[JobDataPoint, NotUsed] = Source.fromPublisher(publisher)
src
}
def createDataPointFlow(dao:DataService, parallelism:Int=1): Flow[Job,JobDataPoint, NotUsed] =
Flow[Job].mapAsync(parallelism)(job =>
createDataSource(job,dao).toMat(Sink.seq)(Keep.right).run()
).mapConcat(identity)
def apply(src:Source[Job,NotUsed], dao:DataService,parallelism:Int=5) = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
//Source
val jobs:Outlet[Job] = builder.add(src).out
//val bcastJobsSrc: Source[Job, NotUsed] = src.toMat(BroadcastHub.sink(256))(Keep.right).run()
//val bcastOutlet:Outlet[Job] = builder.add(bcastJobsSrc).out
//Flows
val bcastJobs:UniformFanOutShape[Job,Job] = builder.add(Broadcast[Job](4))
val rptMaker = builder.add(MyProcessors.flow(dao,parallelism))
val dpFlow = createDataPointFlow(dao,parallelism)
//Sinks
val jobPrinter:Inlet[Job] = builder.add(Sink.foreach[Job](job=>println(s"[MyGraph] Received job: ${job.name} => $job"))).in
val jobList:Inlet[Job] = builder.add(Sink.fold(List.empty[Job])((list,job:Job)=>job::list)).in
val reporter: Inlet[ReportTable] = builder.add(Sink.foreach[ReportTable](r=>println(s"[Report]: $r"))).in
val dpSink: Inlet[JobDataPoint] = builder.add(Sink.foreach[JobDataPoint](dp=>println(s"[DataPoint]: $dp"))).in
jobs ~> bcastJobs
bcastJobs ~> jobPrinter
bcastJobs ~> jobList
bcastJobs ~> rptMaker ~> reporter
bcastJobs ~> dpFlow ~> dpSink
ClosedShape
})
}