我正在研究 Akka 流,并做了一个斐波那契发布者 - 订阅者示例,如下所示。但是,我还不太明白需求最初是如何产生的,以及它与订阅者的请求策略有什么关系。有人可以解释一下吗?
斐波那契出版社:
class FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
private val queue = Queue[Long](0, 1)
def receive = {
case Request(_) => // _ is the demand
log.debug("Received request; demand = {}.", totalDemand)
publish
case Cancel =>
log.info("Stopping.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
final def publish = {
while (isActive && totalDemand > 0) {
val next = queue.head
queue += (queue.dequeue + queue.head)
log.debug("Producing fibonacci number: {}.", next)
onNext(next)
if (next > 5000) self ! Cancel
}
}
}
斐波那契订阅者:
class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
val requestStrategy = WatermarkRequestStrategy(20)
def receive = {
case OnNext(fib: Long) =>
log.debug("Received Fibonacci number: {}", fib)
if (fib > 5000) self ! OnComplete
case OnError(ex: Exception) =>
log.error(ex, ex.getMessage)
self ! OnComplete
case OnComplete =>
log.info("Fibonacci stream completed.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
}
斐波那契应用程序:
val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])
src.via(flow).runWith(sink)
样品运行:问题:最初对 4 的需求是从哪里来的?
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.