6

当我将一些 Objective-C 代码移植到 Swift 时,我试图更好地理解新Combine框架以及如何使用它来重新创建通用设计模式。

在这种情况下,设计模式是单个对象(Manager、Service 等),任何数量的“客户端”都可以注册为委托以接收回调。这是使用委托的基本 1:Many 模式。

Combine看起来很理想,但示例代码有点薄。下面是一个工作示例,但我不确定它是否正确或按预期使用。特别是,我对对象之间的引用循环很好奇。

class Service {

  let tweets = PassthroughSubject<String, Never>()

  func start() {
    // Simulate the need send to send updates.
    DispatchQueue.global(qos: .utility).async {
      while true {
        self.sendTweet()
        usleep(100000)
      }
    }
  }

  func sendTweet() {
    tweets.send("Message \(Date().timeIntervalSince1970)")
  }
}

class Client : Subscriber {
  typealias Input = String
  typealias Failure = Never

  let service:Service
  var subscription:Subscription?

  init(service:Service) {
    self.service = service

   // Is this a retain cycle?
   // Is this thread-safe? 
    self.service.tweets.subscribe(self) 
  }

  func receive(subscription: Subscription) {
    print("Received subscription: \(subscription)")

    self.subscription = subscription
    self.subscription?.request(.unlimited)
  }

  func receive(_ input: String) -> Subscribers.Demand {
    print("Received tweet: \(input)")
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Never>) {
    print("Received completion")
  }
}

// Dependency injection is used a lot throughout the 
// application in a similar fashion to this:

let service = Service()
let client = Client(service:service)

// In the real world, the service is started when
// the application is launched and clients come-and-go.

service.start()

输出是:

Received subscription: PassthroughSubject
Received tweet: Message 1560371698.300811
Received tweet: Message 1560371698.4087949
Received tweet: Message 1560371698.578027
...

这甚至与Combine预期的使用方式相近吗?

4

2 回答 2

3

让我们检查一下!最简单的方法是将 deinit 添加到两个类并限制服务的生命周期

class Service {
    
    let tweets = PassthroughSubject<String, Never>()
    
    func start() {
        // Simulate the need send to send updates.
        DispatchQueue.global(qos: .utility).async {
            (0 ... 3).forEach { _ in
                self.sendTweet()
                usleep(100000)
            }
        }
    }
    
    func sendTweet() {
        tweets.send("Message \(Date().timeIntervalSince1970)")
    }
    deinit {
        print("server deinit")
    }
}

现在很容易检查

do {
    let service = Service()
    //_ = Client(service:service)
    
    // In the real world, the service is started when
    // the application is launched and clients come-and-go.
    
    service.start()
}

将按预期完成

server deinit

使用订阅客户端修改它

do {
    let service = Service()
    _ = Client(service:service)
    service.start()
}

你马上就知道结果

Received subscription: PassthroughSubject
Received tweet: Message 1580816649.7355099
Received tweet: Message 1580816649.8548698
Received tweet: Message 1580816650.001649
Received tweet: Message 1580816650.102639

正如你所料,有一个记忆周期:-)

通常,您需要自己的订阅者实现的可能性非常低。

首先修改服务,这样客户端就会知道什么时候没有更多消息到达

func start() {
        // Simulate the need send to send updates.
        DispatchQueue.global(qos: .utility).async {
            // send some tweets
            (0 ... 3).forEach { _ in
                self.sendTweet()
                usleep(100000)
            }
            // and send "finished"
            self.tweets.send(completion: .finished)
        }
    }

然后通过调用他的 .sink 方法在您的发布者中使用“内置”订阅者。.sink 返回 AnyCancelable (它是一个引用类型),您必须将其存储在某处。

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink { (s) in
        print(s)
    }
}

现在,一切正常,预期...

Message 1580818277.2908669
Message 1580818277.4674711
Message 1580818277.641886
server deinit

但是可取消呢?让我们检查一下!

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink { (s) in
        print(s)
    }
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print(cancelable)
}

它打印

Message 1580819227.5750608
Message 1580819227.763901
Message 1580819227.9366078
Message 1580819228.072041
server deinit
Optional(Combine.AnyCancellable)

因此,如果您不再需要它,则必须“手动”释放它。.sink 又来了!

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink(receiveCompletion: { (completion) in
        print(completion)
        // this inform publisher to "unsubscribe" (not necessery in this scenario)
        cancelable?.cancel()
        // and we can release our canceleble
        cancelable = nil
    }, receiveValue: { (message) in
        print(message)
    })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print(cancelable)
}

和结果

Message 1580819683.462331
Message 1580819683.638145
Message 1580819683.74383
finished
server deinit
nil

Combine 几乎拥有你在真实应用程序中所需的一切,问题是缺乏文档,但互联网上有很多资源可用。

于 2020-02-04T12:39:28.660 回答
1

自定义组合订阅者还应符合可取消协议,该协议提供了一种方法将取消转发到从发布者接收到的订阅对象。这样您就不必公开 Subscription 属性。根据文档:

如果您创建自定义订阅者,则发布者会在您首次订阅时发送一个订阅对象。存储这个订阅,然后当你想取消发布时调用它的 cancel() 方法。创建自定义订阅者时,您应该实现 Cancellable 协议,并让您的 cancel() 实现将调用转发到存储的订阅。 https://developer.apple.com/documentation/combine/receiving_and_handling_events_with_combine

于 2019-07-04T12:40:27.570 回答