我有一个用例,我有固定数量的元素从源流入。这些元素得到处理,然后有一个 Sink,在我们的例子中它是一个 ActorSubscriber。发生的情况是 Source 在发送所有元素后发出完成事件的信号。此事件传播到接收器,接收器触发 onComplete 事件并关闭流工作流。所有这些都发生在元素仍在中间步骤中处理的时候。
一旦中间步骤返回处理后的数据时处理完成,它就找不到订阅者,因为它已经由于触发了 onComplete 事件而关闭。有没有办法我们可以丢弃演员订阅者上的 onComplete 事件并使其始终保持打开状态。
代码片段:
val sample1 = //Sample time series
Source(List(sample1,sample2,sample3))
.map(m => akka.util.ByteString( m.getBytes ))
.to(detectionWorkflow(context))
.run()
val intakeBuffer = b.add(
Flow[ByteString]
.buffer(conf.tcpInboundBufferSize, OverflowStrategy.backpressure)
)
val timeSeries = b.add(
Flow[ByteString]
.via( watch("unpacking") )
.via( unmarshalTimeSeriesData )
.via( watch("unpacked") )
)
val scoring = b.add(
OutlierScoringModel.scoringGraph(planRouterRef = context.planRouter, config = conf)
)
intakeBuffer ~> timeSeries ~> scoring.in
scoring.out0 /*~> publishBuffer*/ ~> Sink.actorSubscriber(SpotlightSubscriber.props)
scoring.out1 ~> logUnrecognized ~> termUnrecognized
演员订阅者:
class SpotlightSubscriber extends ActorSubscriber with ActorLogging {
protected val logger: Logger = Logger(LoggerFactory.getLogger("SpotlightSubscriber"))
protected def requestStrategy = WatermarkRequestStrategy(1)
def receive = {
case OnNext(outlier: Outliers) =>
log.debug("[SpotlightSubscriber] Received : {}", outlier)
case OnError(err: Exception) =>
log.error(err, "[SpotlightSubscriber] Received Exception in Spotlight Stream")
context.stop(self)
case OnComplete =>
log.info("[SpotlightSubscriber] Spotlight Stream Completed!")
context.stop(self)
case _ => }}
评分完成处理工作。正在发生的是具有 3 个元素的源将完整事件发送到接收器。actorSubscriber 的 onComplete 事件在它从源接收到完成事件后被触发。在处理完评分引擎将处理后的结果发回给 actorSubscriber,但由于它不再活跃,我们会收到 deadLetter 消息。