1

假设我有我想做的并行工作,例如下载多个图像。这些下载中的每一个都由一个SignalProducer. 我创建它们并使用该Merge策略将它们展平。当我启动外部生产者并获得一次性设备时,我预计处理它也会处理内部生产者。实际发生的是,在处置后,我的外部生产者停止接收事件,但内部生产者正在执行的实际工作仍在继续,因为它们还没有被处置。

如何让内部生产者完成的实际工作停止?这是一些说明该场景的测试代码:

func producerOfValues(values: [String]) -> SignalProducer<String, NoError> {
    return SignalProducer<String, NoError> {observer, disposable in
        for value in values {
            sleep(1)
            print("doing work: \(value)")
            sendNext(observer, value)
            if disposable.disposed { // 'disposed' is never true
                print("producer disposed")
                break
            }
        }
        sendCompleted(observer)
    }
}

let (signal, sink) = SignalProducer<SignalProducer<String, NoError>, NoError>.buffer(2)
let flattened = signal.flatten(.Merge)

let letterProducer = producerOfValues(["a", "b", "c"])
let numberProducer = producerOfValues(["1", "2", "3"])

sendNext(sink, letterProducer.startOn(QueueScheduler()))
sendNext(sink, numberProducer.startOn(QueueScheduler()))
sendCompleted(sink)

let disposable = flattened.startWithNext {
    print($0)
}
disposable.dispose()

输出:

doing work: a
doing work: 1
doing work: b
doing work: 2
doing work: c
doing work: 3
4

0 回答 0