假设我们可以异步获取固定数量的消息(一个请求,包含 N 个元素)
func fetchMessages(max: UInt, from: Offset) -> SignalProducer<Message,NoError>
现在,我想把它变成一个无界的,当前一个流完成时SignalProducer
会懒惰地调用。fetchMessages
func stream(from: Offset) -> SignalProducer<Message, NoError> {
// challenge is to implement this function
}
一个可行但仍需要预先计算所有范围的初步想法是通用化以下代码
func lazyFetchFrom(from: Offset) -> SignalProducer<Message,NoError> {
return SignalProducer<Message,NoError> { (observer, disposable) in
fetchMessages(from).start(observer)
}
}
let lazyStream =
fetchMessages(1000, from)
.concat(lazyFetchFrom(from + 1000))
.concat(lazyFetchFrom(from + 2000))
.... // could probably be done generically using a flatMap
现在,我想更进一步,并在之前的值被消耗后评估下一次对lazyFetchFrom 的调用。那可能吗?
谢谢
PS:要清楚,我主要关心的是提供某种背压,以便生产者与消费者相比不会生产得太快
编辑:这是我实现一些背压的最新尝试。但是,当我们观察到信号时,背压消失了,一切都在内存中排队