1

我正在使用Retorfit+建立网络RxJava2,我想将响应缓存 30 秒。在 30 秒间隔后进行的任何调用都应从服务器获取最新结果。我尝试使用运营商执行此Replay操作,但每次我调用订阅时它仍然会进行网络调用。我不是 RxJava 方面的专家,所以我对使用Replay缓存的理解可能是错误的。

public Observable<Name> getName() {
        return retrofitBuilder.getName()
                .subscribeOn(Schedulers.io())
                .replay(30, TimeUnit.SECONDS,Schedulers.io())
                .autoConnect();
    }

我这样调用上面的代码:

 service.getName()
        .subscribe(new Consumer<Name>()
            {
                @Override
                public void accept(Name name) throws Exception
                {
                    Log.d("getName", "Name: " + name.toString());
                }
            }
            , new Consumer<Throwable>()
            {
                @Override
                public void accept(Throwable throwable) throws Exception
                {
                    Log.d("getName", throwable.getMessage());
                }
            });

更新:如果我没有清楚地解释我的问题,我深表歉意。我想要的是在特定请求上缓存,而不是在HttpClient将缓存策略应用于通过它发出的所有请求的级别上缓存它。最后,我想在需要时为不同的请求定义不同的缓存过期时间。并非我的所有请求都需要在短时间内缓存。我想知道我是否可以做到这一点。

感谢您在这方面的帮助。

4

2 回答 2

2

您的方法有两个问题:

  1. 正如@drhr 提到的,Observable每次调用时 您都在创建一个新实例,service.getName()您正在创建一个新的实例Observable,您应该保持相同的重播实例,并在每次调用时将相同实例之外的调用者提供给调用者service.getName()
  2. 即使您将在replay30 秒内返回相同的实例,也会重播源在过去 30 秒内发出的序列Observable,这意味着在缓存到期时间之后,您将一无所获,因为您的请求发生在 30 秒以上。这并不意味着Observable在这段时间之后会自动重启。

为了缓存特定时期,您基本上需要在缓存期之后使缓存的响应无效,并在此期间之后执行新的请求,这意味着您应该控制您的订阅,并在那里进行。
您可以通过以下方式实现它:

public class CachedRequest<T> {

    private final AtomicBoolean expired = new AtomicBoolean(true);
    private final Observable<T> source;
    private final long cacheExpirationInterval;
    private final TimeUnit cacheExpirationUnit;
    private Observable<T> current;

    public CachedRequest(Observable<T> o, long cacheExpirationInterval,
                         TimeUnit cacheExpirationUnit) {
        source = o;
        current = o;
        this.cacheExpirationInterval = cacheExpirationInterval;
        this.cacheExpirationUnit = cacheExpirationUnit;
    }

    private Observable<T> getCachedObservable() {
        return Observable.defer(() -> {
            if (expired.compareAndSet(true, false)) {
                current = source.cache();
                Observable.timer(cacheExpirationInterval, cacheExpirationUnit)                          
                        .subscribe(aLong -> expired.set(true));
            }
            return current;
        });
    }
}

使用 defer 您可以Observable根据缓存到期状态返回权限,因此在缓存到期内发生的每个订阅都将被缓存Observable(使用cache()) - 意味着请求将只执行一次。缓存过期后,额外的订阅会触发新的请求,并会设置一个新的定时器来重置缓存过期。

于 2017-04-04T05:18:32.180 回答
1

尝试查看 okhttp 拦截器。

添加缓存拦截器:

public class CacheInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        Response response = chain.proceed(chain.request());

        CacheControl cacheControl = new CacheControl.Builder()
                .maxAge(30, TimeUnit.SECONDS)
                .build();

        return response.newBuilder()
                .removeHeader("Pragma")
                .removeHeader("Cache-Control")
                .header("Cache-Control", cacheControl.toString())
                .build();
    }
}

并将其添加并缓存到您的 OkHttp 客户端,如下所示:

File httpCacheDirectory = new File(context.getCacheDir(), "http-cache");
int cacheSize = 10 * 1024 * 1024; // 10 MiB
Cache cache = new Cache(httpCacheDirectory, cacheSize);

OkHttpClient httpClient = new OkHttpClient.Builder()
                               .addNetworkInterceptor(new CacheInterceptor())
                               .cache(cache)
                               .build();
于 2017-04-03T20:30:00.790 回答