4

我有在 while 循环中执行阻塞操作的代码(从服务器下载一些数据)。客户不知道每一步要返回多少物品。下载 N 个项目时循环中断。

val n = 10
val list = ArrayList<T>()

while (list.size < n) {
    val lastItemId = list.last()?.id ?: 0
    val items = downloadItems(lastItemId)
    list.addAll(items)
}

downloadItems执行阻塞 HTTP 调用并返回列表。现在让我们假设downloadItems更改和新的返回类型是Observable<Item>. 我如何更改代码以使用 RxJava 而无需执行类似的操作blockingGet

4

2 回答 2

10

您可以使用repeatUntil来实现这一点:

var totalItems = 0    
var id = 0
Observable.fromCallable {
            downloadItems(id)
        }
        .flatMap {
            list ->
                totalItems += list.size
                id = list.last()?.id ?: 0
                Observable.just(list)
        }
        .repeatUntil({totalItems > n})
        .subscribe({result -> System.out.println(result) })
于 2018-07-05T08:21:45.353 回答
1

我认为这是优雅的方式

int n = 10;
Observable.range(0,n)
        .flatMap(i -> downloadItems(i))
        .toList()
        .subscribe(itemsList -> itemsList.forEach(item -> System.out.println(item)));
于 2018-07-05T15:26:47.337 回答