我有一个 PassthroughSubject,它发送 30 个整数,后跟一条完成消息。
从对象那里收到这些数字后,我生成了一个休眠一秒钟的未来,并以输入数字 * 2 完成。
我使用 .receiveOn 来确保期货同时运行,但这意味着完成消息也同时通过链传播并在所有期货完成之前结束接收器。
那里的任何 RxSwift/Combine 向导都知道我可以如何做到这一点,因此完成消息的接收会因期货完成而延迟?
这是一个实现所描述行为的游乐场:
import Foundation
import Combine
import PlaygroundSupport
/// Setting up the playground
PlaygroundPage.current.needsIndefiniteExecution = true
/// Injects numbers 0-30 into combine message stream, and then sends a finish.
func publishNumbers(to subject: PassthroughSubject<Int, Error>) {
(0..<30).forEach {
subject.send($0)
}
subject.send(completion: .finished)
}
/// Delays for one secont, and completes the future by doubling the input.
func delayAndDoubleNumber(_ int: Int) -> Future<Int, Error> {
return Future<Int, Error> { complete in
sleep(1)
complete(.success(int * 2))
}
}
// Properties involved in Combine processing chain.
let numbersSubject = PassthroughSubject<Int, Error>()
let processingQueue = DispatchQueue.global(qos: .userInitiated)
// Combine processing chain
numbersSubject
.receive(on: processingQueue) //Comment this line to observe that all futures finish, and are collected before the finish message kills the sink.
.flatMap { number in
return delayAndDoubleNumber(number)
}
.collect(4)
.sink(receiveCompletion: { completion in
print("Complete: \(completion)")
}, receiveValue: { value in
print("Received Value: \(value)")
})
publishNumbers(to: numbersSubject)