0

技术背景

我在 Web 浏览器中使用 jQuery 来调用从日志中返回一组条目的 API。

API 请求有两个参数:

  • offset_timestamp:一个整数,指定我想要的最早的可能条目
  • limit:一个整数,指定要返回的记录数

示例请求和响应

Request with parameters:

- offset_timestamp = 100
- limit = 50


curl "https://example.com/log?offset_timestamp=100&limit=5"


Resulting JSON Response:

{
    next_timestamp: 222,
    end_of_log: false,
    entries: [
        {
            timestamp: 111,
            data: { ... }
        },
        {
            timestamp: 112,
            data: { ... }
        },

        ...

        {
            timestamp: 160,
            data: { ... }
        }
    ]
}

如果我使用普通的 jQuery + 回调,我想我必须递归地链接 AJAX 调用。类似于以下内容:

// NOTE: I have NOT tested this code.
//       I just wrote it for illustration purposes

function getBatch(offset, limit, callback) {
    var url = "https://example.com/logs?offset_timestamp=" + offset + "&limit=" + limit;
    var ajaxParams = {
        method: "GET",
        url: url
    };

    jQuery.ajax(ajaxParams))
        .done(function(data) {
            if (data.end_of_log) {
                return callback(data.entries);
            }
            return getBatch(data.next_timestamp, limit, function(entries) { 
                return callback(entries.concat(data.entires));
            });
        });
}

function processEntries(entries) {
    for (var i = 0; i < entries.length; i++) {
        console.log(i.data);
    }
}

getBatch(0, 50, processEntries);

自然,我宁愿拥有一系列 Observables(每个都持有一批条目),这样我就可以flatMap()用来获取所有条目的序列。

问题

如果从 jQuery 调用创建一个 Observable,例如Rx.Observable.fromPromise(jQuery.ajax({...})),是否可以使用 RxJS 将任意数量的这些 Observable 链接在一起,使用response.next_timestamp来自先前调用的值在后续调用的参数中?

4

2 回答 2

1

Rx.Subject允许我们写入(推送数据)以及从(订阅)流中读取。Rx.Subject行为就像Rx.Observable,然而,实例有一个onNext方法可以让我们推送数据。

我们可以将我们的请求建模为Rx.Subject,并让我们的响应流依赖于它。然后我们可以订阅响应并将下一个请求推送到请求流:

// Represents our stream of requests
const request$ = new Rx.Subject();

// Represents our AJAX responses
const response$ = request$
    .startWith(initialRequest)
    .flatMap(makeAjaxCall)

    // Convert to a hot stream so we don't end up making an AJAX
    // request for every subscriber of this stream.
    .share();

// And we can map over response$ to get the updated timestamp
const timestamp$ = response$.map(getTimeStamp);

// And we can subscribe to the timestamp$ stream to push new requests
timestamp$.subscribe(timestamp => {
    const request = makeRequestFromTimestamp(timestamp);

    // Call onNext to push a new item onto our request$ stream
    request$.onNext(request);
});

// And we can subscribe to our responses. Note that our response$
// stream will continue indefinitely.
response$.subscribe(x => console.log(x));
于 2016-03-16T18:48:12.177 回答
1

我会尝试使用expand运算符。您还可以在此处查看使用示例:RxJs: How to loop based on the state of the observable? ,在这里:RxJS,如何轮询 API 以使用动态时间戳持续检查更新的记录

简而言之,它允许您使用 observables 进行递归,通过返回来表示递归结束Rx.Observable.empty()

于 2016-03-12T09:56:16.393 回答