我正在尝试使用 rx Java 中的背压在我的 Android 应用程序中创建无限滚动。我希望它只调用外部服务请求的次数(调用后request(1)
)。但是在使用 flatmap 之后,每次subscribe
加载 16 页。
在我的代码下面有预期的结果。几乎每个测试都因为第一次请求而失败(n=16)
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;
public class ServiceObservablesTest {
public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
AtomicInteger pageNumber = new AtomicInteger();
subscriber.setProducer(n -> {
// at subscribe rxJava makes request for 16 elements - probably because of flatMap
// after first request with 16 elements everything seems to work fine even if i ignore the 'n' param
Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
subscriber.onNext(page);
});
});
return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
}
public interface DataProvider<T> {
Observable<List<T>> requestPage(int page);
}
private DataProvider provider;
@Before
public void setUp() throws Exception {
provider = Mockito.mock(DataProvider.class);
List<Object> list = Arrays.asList(new Object());
when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
}
@Test
public void shouldRequestOnlyFirstPageOnSubscribe() {
//given
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
//then
subscriber.assertValueCount(1);
subscriber.assertNotCompleted();
verify(provider, times(1)).requestPage(0);
verify(provider, never()).requestPage(1);
}
@Test
public void shouldRequestNumberOfPagesSpecified() {
//given
int requested_pages = 5;
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(requested_pages);
//then
subscriber.assertValueCount(requested_pages);
subscriber.assertNotCompleted();
for (int i = 0; i < requested_pages; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(requested_pages);
}
@Test
public void shouldCompleteAfterRetrievingEmptyResult() {
//given
int emptyPage = 2;
when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
//then
subscriber.assertValueCount(emptyPage);
subscriber.assertCompleted();
verify(provider, times(1)).requestPage(0); //requested at subscribe
for (int i = 1; i <= emptyPage; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(emptyPage + 1);
}
@Test
public void shouldRequestNextPageWhenRequestedMore() {
//given
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(1);
//then
subscriber.assertValueCount(2);
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);
//when
subscriber.requestMore(1);
//then
subscriber.assertValueCount(3);
subscriber.assertNotCompleted();
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, times(1)).requestPage(2);
verify(provider, never()).requestPage(3);
}
@Test
public void shouldWorkWithMultipleSubscribers() {
//given
TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber1);
flightsObservable.subscribe(subscriber2);
//then
subscriber1.assertValueCount(1);
subscriber2.assertValueCount(1);
verify(provider, times(2)).requestPage(0);
verify(provider, never()).requestPage(1);
//when
subscriber1.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(1);
verify(provider, times(2)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);
//when
subscriber2.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(2);
verify(provider, times(2)).requestPage(0);
verify(provider, times(2)).requestPage(1);
verify(provider, never()).requestPage(2);
}
}