此函数的目标是创建一个流,该流定期发出值,直到遇到与谓词匹配的值。
这是我想出的一些骨架代码:
class Watcher<T : Any>(
/**
* Emits the data associated with the provided id
*/
private val callable: (id: String) -> T,
/**
* Checks if the provided value marks the observable as complete
*/
private val predicate: (id: String, value: T) -> Boolean
) {
private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()
fun watch(id: String): Observable<T> {
// reuse obesrvable if exists
val existing = watchPool[id]
if (existing != null)
return existing
val value = callable(id)
if (predicate(id, value)) return Observable.just(value)
// create new observable to fetch until complete,
// then remove from the map once complete
val observable = Observable.fromCallable<T> {
callable(id)
}.repeatWhen { /* What to put here? */ }.doOnComplete {
watchPool.remove(id)
}.distinctUntilChanged()
watchPool[id] = observable
return observable
}
}
例如,如果我有以下枚举:
enum class Stage {
CREATED, PROCESSING, DELIVERING, FINISHED
}
还有一些可以检索正确阶段的可调用对象,我应该能够传递可调用对象和检查 if 的谓词stage == FINISHED
,并进行轮询,直到我得到FINISHED
事件。
我遇到的问题是当收到的事件不是最终事件时生成一个可观察的。在这种情况下,observable 应该继续轮询事件,直到它接收到与谓词匹配的事件或直到它没有更多的订阅者。
这个可观察的应该:
- 在收到至少一个订阅者之前不要轮询
- 每 x 秒轮询一次
predicate
如果返回 true ,则将自身标记为完成- 如果从 >0 个订阅者变为 0 个订阅者,则完成自身
使用监视池只是为了确保监视同一个 id 的两个线程不会轮询两次。从地图中删除可观察对象也只是为了不堆积。出于同样的原因,只发出一个变量的 observable 不会被存储以供参考。
如何为上面添加的点添加功能?我将链接到一个我发现有用的现有RxJava Github 问题,但据我所知,它不允许处理由可调用对象发出的值的谓词。