1

我正在尝试根据一个 ID 将相同的 Flowable 共享给多个订阅者,而每个订阅者都可以在需要时取消订阅以取消订阅。如果某个 id 的所有订阅者都取消订阅,然后另一个订阅者尝试订阅该 id,则应该创建一个新订阅。下面的代码是用 Kotlin 编写的,试图实现这个功能。

class SharedFlowProvider() {
    private val flowProvider = FlowProvider()
    private val eventFlowById = HashMap<String, Flowable<ComputedProperties>>()

    @Synchronized
    override fun subscribeToProperties(subscriber: CancellableSubscriber<ComputedProperties>, id: String){
        eventFlowById.computeIfAbsent(id) { buildFlowable(id) }
                .subscribe(orderBasedSubscriber)
    }

    private fun buildFlowable(id: String) = 
        flowProvider.getFlowable(id)
            .doFinally { removeSubscription(id) }
            .share()

    @Synchronized
    private fun removeSubscription(id: String) {
        eventFlowById.remove(id)
    }
}

当取消订阅,然后订阅相同的 id 时,就会出现问题。由于 share 返回的 FlowableRefCount 的工作方式,我发现这种方法可能存在死锁。在底层实现中,FlowableRefCount 在其 subscribeActual 和 cancel 方法中使用了同步机制,虽然乍一看并不明显,但 doFinally 方法的动作是在该锁定机制内运行的。这有点违反直觉,因为在 doFinally 文档中它说:

请注意,onFinally 操作在订阅之间共享,因此应该是线程安全的。

正因为如此,出现了以下场景:

  • 通过取消锁定 FlowableRefCount 实例
  • subscribeToProperties 调用的 id 与取消中的相同
  • subscribeToProperties 在调用 subscribe(然后是 subscribeActual)时被阻塞,因为已经通过取消在 FlowableRefCount 上获得了锁
  • 调用 removeOrderSubscription 被阻塞,因为 SharedFlowProvider 上的锁定已经通过 subscribeToProperties 方法获得

我还尝试通过以下方式使用 ConcurrentHashMap 将锁定从 subscribeToProperties 方法中分离出来:

private val eventFlowById = ConcurrentHashMap<String, Flowable<ComputedProperties>>()

override fun subscribeToProperties(subscriber: CancellableSubscriber<ComputedProperties>, id: String){
    val flow = eventFlowById.computeIfAbsent(id) { buildFlowable(id) }
    flow.subscribe(orderBasedSubscriber)
    //the flow associated with an id could be removed by a cancel done before the subscribe call, so we need to make sure it is added to the map
    eventFlowById.computeIfAbsent(id) { flow }
}

但是在这种方法中,如果我们在方法的最后一行之前有一个订阅然后快速取消,我们最终可能会将流放入地图中,即使我们没有订阅者。

我将不胜感激有关如何在没有死锁或竞争条件的情况下实现此功能的一些想法。

谢谢

4

0 回答 0