3

我有一个 PassthroughSubject,它发送 30 个整数,后跟一条完成消息。

从对象那里收到这些数字后,我生成了一个休眠一秒钟的未来,并以输入数字 * 2 完成。

我使用 .receiveOn 来确保期货同时运行,但这意味着完成消息也同时通过链传播并在所有期货完成之前结束接收器。

那里的任何 RxSwift/Combine 向导都知道我可以如何做到这一点,因此完成消息的接收会因期货完成而延迟?

这是一个实现所描述行为的游乐场:

import Foundation
import Combine
import PlaygroundSupport

/// Setting up the playground
PlaygroundPage.current.needsIndefiniteExecution = true

/// Injects numbers 0-30 into combine message stream, and then sends a finish.
func publishNumbers(to subject: PassthroughSubject<Int, Error>) {
    (0..<30).forEach {
        subject.send($0)
    }
    subject.send(completion: .finished)
}
/// Delays for one secont, and completes the future by doubling the input.
func delayAndDoubleNumber(_ int: Int) -> Future<Int, Error> {
    return Future<Int, Error> { complete in
        sleep(1)
        complete(.success(int * 2))
    }
}

// Properties involved in Combine processing chain.
let numbersSubject = PassthroughSubject<Int, Error>()
let processingQueue = DispatchQueue.global(qos: .userInitiated)


// Combine processing chain
numbersSubject
    .receive(on: processingQueue) //Comment this line to observe that all futures finish, and are collected before the finish message kills the sink.
    .flatMap { number in
        return delayAndDoubleNumber(number)
    }
    .collect(4)
    .sink(receiveCompletion: { completion in
        print("Complete: \(completion)")
    }, receiveValue: { value in
        print("Received Value: \(value)")
    })

publishNumbers(to: numbersSubject)

4

2 回答 2

4

从 Xcode 11 beta 3 开始,您不能将并发队列与 Combine 一起使用。您应该可以通过 Xcode 11 GM 进行操作。

Philippe Hausler 是一名 Apple 工程师,在 Combine 工作。他在Swift 官方论坛上说:

另外值得注意的是,DispatchQueue用作调度程序的调度程序必须始终是串行的,以遵守联合运营商的合同。

后来他说:

所以在这里跟进,关于下游事件的传播方式有一些变化。即使 DispatchQueue 是并发的,或者 OperationQueue 不是 maxConcurrentOperations 为 1 的限制,或者任何有效的调度程序是并发的,我们现在也能够满足 1.03 的约束;我们将始终在请求的调度程序上发送序列化事件.receive(on:)。我们稍微偏离规范的另一个警告是,上游事件(例如cancel()request(_:)在我们的世界中)可以同时发生。话虽如此,我们确实以线程安全的方式处理它们。

您可以在 Xcode 11 beta 3 中通过调度到并发队列,然后从您Future的闭包中返回到主队列来使您的并发工作:

import Foundation
import Combine
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

func delayAndDoubleNumber(_ int: Int) -> Future<Int, Never> {
    return Future<Int, Never> { complete in
        DispatchQueue.global(qos: .userInitiated).async {
            sleep(1)
            DispatchQueue.main.async {
                complete(.success(int * 2))
            }
        }
    }
}

let subject = PassthroughSubject<Int, Never>()

subject
    .flatMap { delayAndDoubleNumber($0) }
    .collect(4)
    .sink(
        receiveCompletion: { print("Complete: \($0)") },
        receiveValue: { print("Received Value: \($0)") })

let canceller = (0 ..< 30).publisher().subscribe(subject)
于 2019-07-11T20:18:29.703 回答
1

免责声明,这可能是对文档的错误解释,但我认为您应该使用subscribe(on:)运算符而不是receive(on:).

苹果文档

与影响下游消息的 receive(on:options:) 相比,subscribe(on:) 改变了上游消息的执行上下文。

我对此的解释是,如果您希望您的事件numbersSubject在您的队列上发出,您可以使用subscribe(on:),例如:

numbersSubject
    .flatMap { number in
        return delayAndDoubleNumber(number)
    }
    .collect(4)
    .subscribe(on: processingQueue)
    .receive(on: RunLoop.main)
    .sink(receiveCompletion: { completion in
        print("Complete: \(completion)")
    }, receiveValue: { value in
        print("Received Value: \(value)")
    })
于 2019-07-11T13:13:19.357 回答