假设我有我想做的并行工作,例如下载多个图像。这些下载中的每一个都由一个SignalProducer
. 我创建它们并使用该Merge
策略将它们展平。当我启动外部生产者并获得一次性设备时,我预计处理它也会处理内部生产者。实际发生的是,在处置后,我的外部生产者停止接收事件,但内部生产者正在执行的实际工作仍在继续,因为它们还没有被处置。
如何让内部生产者完成的实际工作停止?这是一些说明该场景的测试代码:
func producerOfValues(values: [String]) -> SignalProducer<String, NoError> {
return SignalProducer<String, NoError> {observer, disposable in
for value in values {
sleep(1)
print("doing work: \(value)")
sendNext(observer, value)
if disposable.disposed { // 'disposed' is never true
print("producer disposed")
break
}
}
sendCompleted(observer)
}
}
let (signal, sink) = SignalProducer<SignalProducer<String, NoError>, NoError>.buffer(2)
let flattened = signal.flatten(.Merge)
let letterProducer = producerOfValues(["a", "b", "c"])
let numberProducer = producerOfValues(["1", "2", "3"])
sendNext(sink, letterProducer.startOn(QueueScheduler()))
sendNext(sink, numberProducer.startOn(QueueScheduler()))
sendCompleted(sink)
let disposable = flattened.startWithNext {
print($0)
}
disposable.dispose()
输出:
doing work: a
doing work: 1
doing work: b
doing work: 2
doing work: c
doing work: 3