1

I have the following design I'd like to create, but I'm not sure which Rx pattern matches it. The goal is more or less along the lines of a Single, but with a conditional check.

  • There is one Observable<String>, and the possibility of any number of observers.
  • If a request is first made, the observable will execute some network request taking in the string, then emit a callback (much like a completable/single)
  • Any subsequent call with the same key will return the same result immediately
  • However, if 5 minutes has passed and the same call is made, we will refetch the data as it may have expired, then emit it to any listeners. This result will be saved for another 5 minutes, and the cycle repeats.
  • All data is stored based on the key sent, much like a flyweight pattern. Expiration is based off of the last request time of the specific key.

My initial thought was to just make my own class with a concurrent hashmaps. However, this will mean I have to handle a lot of the threading mechanisms myself. I feel like RxJava will be a great solution to this, but I'm not sure if such patterns exist. Does anyone have an idea?

I get that the purpose of a Single<T> is meant to only retrieve a single response, so my terms may not be correct.

The following is my attempt, which I will be updating as I go

/**
 * Created by Allan Wang on 07/01/18.
 *
 * Reactive flyweight to help deal with prolonged executions
 * Each call will output a [Single], which may be new if none exist or the old one is invalidated,
 * or reused if an old one is still valid
 *
 * Types:
 * T    input       argument for caller
 * C    condition   condition to check against for validity
 * R    response    response within reactive output
 */
abstract class RxFlyweight<in T : Any, C : Any, R : Any> {

    /**
     * Given an input emit the desired response
     * This will be executed in a separate thread
     */
    protected abstract fun call(input: T): R

    /**
     * Given an input and condition, check if
     * we may used cache data or if we need to make a new request
     * Return [true] to use cache, [false] otherwise
     */
    protected abstract fun validate(input: T, cond: C): Boolean

    /**
     * Given an input, create a new condition to be used
     * for future requests
     */
    protected abstract fun cache(input: T): C

    private val conditionals = mutableMapOf<T, C>()
    private val sources = mutableMapOf<T, Single<R>>()

    private val lock = Any()

    /**
     * Entry point to give an input a receive a [Single]
     * Note that the observer is not bound to any particular thread,
     * as it is dependent on [createNewSource]
     */
    operator fun invoke(input: T): Single<R> {
        synchronized(lock) {
            val source = sources[input]

            // update condition and retrieve old one
            val condition = conditionals.put(input, cache(input))

            // check to reuse observable
            if (source != null && condition != null && validate(input, condition))
                return source

            val newSource = createNewSource(input).cache()

            sources.put(input, newSource)
            return newSource
        }
    }

    /**
     * Open source creator
     * Result will then be created with [Single.cache]
     * If you don't have a need for cache,
     * you likely won't have a need for flyweights
     */
    open protected fun createNewSource(input: T): Single<R> =
            Single.fromCallable { call(input) }
                    .timeout(20, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())

    fun reset() {
        synchronized(lock) {
            sources.clear()
            conditionals.clear()
        }
    }

}
4

0 回答 0