0

此函数的目标是创建一个流,该流定期发出值,直到遇到与谓词匹配的值。

这是我想出的一些骨架代码:

class Watcher<T : Any>(
        /**
         * Emits the data associated with the provided id
         */
        private val callable: (id: String) -> T,
        /**
         * Checks if the provided value marks the observable as complete
         */
        private val predicate: (id: String, value: T) -> Boolean
) {

    private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()

    fun watch(id: String): Observable<T> {
        // reuse obesrvable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = callable(id)
        if (predicate(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.fromCallable<T> {
            callable(id)
        }.repeatWhen { /* What to put here? */ }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged()
        watchPool[id] = observable
        return observable
    }

}

例如,如果我有以下枚举:

enum class Stage {
    CREATED, PROCESSING, DELIVERING, FINISHED
}

还有一些可以检索正确阶段的可调用对象,我应该能够传递可调用对象和检查 if 的谓词stage == FINISHED,并进行轮询,直到我得到FINISHED事件。

我遇到的问题是当收到的事件不是最终事件时生成一个可观察的。在这种情况下,observable 应该继续轮询事件,直到它接收到与谓词匹配的事件或直到它没有更多的订阅者。

这个可观察的应该:

  • 在收到至少一个订阅者之前不要轮询
  • 每 x 秒轮询一次
  • predicate如果返回 true ,则将自身标记为完成
  • 如果从 >0 个订阅者变为 0 个订阅者,则完成自身

使用监视池只是为了确保监视同一个 id 的两个线程不会轮询两次。从地图中删除可观察对象也只是为了不堆积。出于同样的原因,只发出一个变量的 observable 不会被存储以供参考。

如何为上面添加的点添加功能?我将链接到一个我发现有用的现有RxJava Github 问题,但据我所知,它不允许处理由可调用对象发出的值的谓词。

4

1 回答 1

1

我最终只使用takeUntil, 并使用观察的间隔方法进行轮询。

abstract class RxWatcher<in T : Any, V : Any> {

    /**
     * Emits the data associated with the provided id
     * At a reasonable point, emissions should return a value that returns true with [isCompleted]
     * This method should be thread safe, and the output should not depend on the number of times this method is called
     */
    abstract fun emit(id: T): V

    /**
     * Checks if the provided value marks the observable as complete
     * Must be thread safe
     */
    abstract fun isCompleted(id: T, value: V): Boolean

    /**
     * Polling interval in ms
     */
    open val pollingInterval: Long = 1000

    /**
     * Duration between events in ms for which the observable should time out
     * If this is less than or equal to [pollingInterval], it will be ignored
     */
    open val timeoutDuration: Long = 5 * 60 * 1000

    private val watchPool: MutableMap<T, Observable<V>> = ConcurrentHashMap()

    /**
     * Returns an observable that will emit items every [pollingInterval] ms until it [isCompleted]
     *
     * The observable will be reused if there is polling, so the frequency remains constant regardless of the number of
     * subscribers
     */
    fun watch(id: T): Observable<V> {
        // reuse observable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = emit(id)
        if (isCompleted(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.interval(pollingInterval, TimeUnit.MILLISECONDS, Schedulers.io()).map {
            emit(id)
        }.takeUntil {
            isCompleted(id, it)
        }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged().run {
            if (timeoutDuration > pollingInterval) timeout(timeoutDuration, TimeUnit.MILLISECONDS)
            else this
        }
        watchPool[id] = observable
        return observable
    }

    /**
     * Clears the observables from the watch pool
     * Note that existing subscribers will not be affected
     */
    fun clear() {
        watchPool.clear()
    }

}
于 2018-05-18T03:05:16.343 回答