1

我目前正在迁移我的应用程序以在 Swift 中使用并发模型。我想序列化任务以确保它们一个接一个地执行(没有并行性)。在我的用例中,我想收听 NotificationCenter 发布的通知,并在每次发布新通知时执行一个任务。但我想确保没有以前的任务正在运行。这相当于使用 maxConcurrentOperationCount = 1 的 OperationQueue。

例如,我在我的应用程序中使用 CloudKit 和 Core Data,并使用持久历史跟踪来确定商店中发生了哪些变化。在此将本地商店同步到云示例代码中,Apple 使用操作队列来处理历史处理任务(在 CoreDataStack 中)。此 OperationQueue 的最大操作数设置为 1。

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

当收到 Core Data 通知时,会向这个串行操作队列中添加一个新任务。因此,如果收到很多通知,它们都会以串行方式一个接一个地执行。

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

在这个加载和显示大数据馈送示例代码中,Apple 使用任务来处理历史更改(在 QuakesProvider 中)。

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

我觉得第二个项目有问题,因为任务可以按任何顺序发生,不一定按顺序发生(与 OperationQueue 作为 maxConcurrentOperationCount = 1 的第一个项目相反)。

我们应该在某处使用演员来确保方法被串行调用吗?

我想过这样的实现,但我还不是很满意:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

收到新通知时将调用 processRemoteStoreChange 方法(AsyncSequence):

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}
4

1 回答 1

1
于 2022-01-05T01:31:53.837 回答