2

是否有任何使用 发出任意数量的连续的、依赖的 http 请求的示例cycle-http

我想从 API 中获取每个页面,其中下一个请求只能使用当前页面中的数据进行。

我试图调整这个使用 Observable.merge() 的答案,但我不确定如何将它连接到cycle-http源和接收器。

参考

4

3 回答 3

3

这是使用Cycle.js@cycle/fetch驱动程序处理任意数量的顺序依赖请求的另一种方法。

(使用GitHub用户 API。用户查询每页返回 30 个用户,sinceURL 参数是用户 ID 号,并从下一个用户 ID 开始查询。)

main首先是带有注释的函数的主要部分:

const listResponse$ = sources.FETCH // response returned from FETCH driver
  .mergeAll()
  .flatMap(res => res.json())
  .scan(
    ((userstotals, users) =>
      [
        userstotals[0] + 1, // page count
        users[29] && users[29].id, // last id on full page
        userstotals[2].concat(users) // collect all users
      ]
    ),
    [0, undefined, []] // default accumulator
  )
  .share(); // allows stream split


// Branch #1 - cycle again for more pages
const listRequest$ = listResponse$
  .filter(users =>
    0 < users[1] && // full page id exists
    maxpages > users[0]; // less than maxpages
  )
  .startWith('initial')
  .map(users =>
    `https:\/\/api.github.com/users?since=${
      (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
      idstart // default id start
    }`
  );


// Branch #2 - display
const dom$ = listResponse$
  .map(userstotals => div(JSON.stringify(userstotals[2])));

(这是一个更新的答案。我意识到scans 可以合并为一个。)

sources解释:首先从属性中拉取响应,将其FETCH展平并拉出JSON,然后scan统计到目前为止已经查询了多少页。稍后将查询的页面数量进行比较,maxpages以便不超过预定数量。接下来获取id完整页面的最后一个(如果存在),最后一个concat是当前用户页面以及到目前为止累积的用户页面集合。在累积响应信息之后share,流可以分成两个分支。

第一个分支用于通过驱动程序重新循环查询FETCH以查询更多页面。但首先filter要检查最后一页和查询的页数。如果 id 不是数字,则已到达最后一页。如果已经到达最后一页并因此没有更多页面要查询,则不要继续。如果查询的页数超过 的值,也不要继续maxpages

第二个分支只是简单地进入累积响应中获取完整的用户列表,然后JSON.stringify将对象转换为虚拟 dom 对象(div方法)发送到 DOM 驱动程序进行显示。


这是完整的脚本:

import Cycle from '@cycle/rx-run';
import {div, makeDOMDriver} from '@cycle/dom';
import {makeFetchDriver} from '@cycle/fetch';

function main(sources) { // provides properties DOM and FETCH (evt. streams)

  const acctok = ''; // put your token here, if necessary
  const idstart = 19473200; // where do you want to start?
  const maxpages = 10;

  const listResponse$ = sources.FETCH
    .mergeAll()
    .flatMap(res => res.json())
    .scan(
      ((userstotals, users) =>
        [
          userstotals[0] + 1, // page count
          users[29] && users[29].id, // last id on full page
          userstotals[2].concat(users) // collect all users
        ]
      ),
      [0, undefined, []]
    )
    .share();

  const listRequest$ = listResponse$
    .filter(function (users) {
      return 0 < users[1] && maxpages > users[0];
    })
    .startWith('initial')
    .map(users =>
      `https:\/\/api.github.com/users?since=${
        (!isNaN(parseInt(users[1], 10)) && users[1]) || // last id full page
        idstart // default id start
      }` //&access_token=${acctok}`
    );

  const dom$ = listResponse$
    .map(userstotals => div(JSON.stringify(userstotals[2])));

  return {
    DOM: dom$,
    FETCH: listRequest$
  };
}

Cycle.run(main, {
  DOM: makeDOMDriver('#main-container'),
  FETCH: makeFetchDriver()
});

(我的第一个答案,留给后代。注意两个scans。)

const listResponse$ = sources.FETCH
  .mergeAll()
  .flatMap(res => res.json())
  .scan(((userscount, users) =>              // <-- scan #1
    [userscount[0] + 1, users]), [0, []]
  )
  .share();

const listRequest$ = listResponse$
  .filter(function (users) {
    return users[1][29] && users[1][29].id &&
      maxpages > users[0];
  })
  .startWith('initial')
  .map(users =>
    `https://api.github.com/users?since=${
      (users[1][users[1].length-1] && users[1][users[1].length-1].id) ||
        idstart
      }`
  );

const dom$ = listResponse$
  .scan(function (usersall, users) {          // <-- scan #2
    usersall.push(users);
    return usersall;
  }, [])
  .map(res => div(JSON.stringify(res)));

通过scaning 一次,我需要获取完整页面的最后一个 id(如果存在),并将其存储在累加器中。

于 2016-05-17T17:33:33.873 回答
1

如果您提供一些代码示例会更好。但是基本逻辑可能是这样的:

  1. 将响应流映射到请求流
  2. 使用初始请求启动请求流

代码将是这样的:

function main (sources){
  const initialRequest = {
    url: 'http://www.google.com'
  };
  
  const request$ = sources.HTTP
  .filter(response$ => /*FILTER LOGIC GOES HERE */)
  .switch()//or you can use flatMap
  .map(response =>/* MAP RESPONSE TO A NEW REQUEST */)
  .startWith(initialRequest);
  
  return {
    HTTP: request$
  };
}

于 2016-05-16T02:53:08.907 回答
0

所以这可能过于复杂了,我应该放弃它以正确尝试Erdal 的答案, 但这是我想出的......

用法

export default function app({HTTP}) {
  const {
    allPagesRequest$: staffPagesReq$,
    latestData$: staff$,
  } = getAllPages({url: '/staff', HTTP});

  // staff$ is converted to vdom...

  return /* sinks */ {
    DOM:  staffPagesReq$,
    HTTP: staffVdom$,
  }
}

执行

const fetchNthPage = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    }, optsIn
  );

  const u = new URI(opts.url)
    .setQuery({'_page': opts.page.toString()});

  const pageNResponse$ = opts.HTTP
    .filter(
      res$ => res$.request.url === urlForEndpoint(u)
    )
    .flatMap(
      res$ => res$.catch(
        err => Observable.of(
          {
            body: {'error in response:': err.toString()}
          }
        )
      )
    )
    .map(res => res.body)
    .take(1)
    .shareReplay(1);

  return Observable.of({
    pageNRequest$:  Observable.of(basicRequestObject(u)),
    pageNResponse$: pageNResponse$,
    opts:           opts
  });
};


const encapsulateAs = typeName => data => {
  return {type: typeName, data}
};


const fetchAllPagesIndividually = (optsIn) => {
  const opts = merge(
    {
      url:  '',
      page: 0,
      HTTP: undefined,
    },
    optsIn
  );

  return Observable.defer(
    () => fetchNthPage(opts)
      .flatMap(x => {
        const mergedItems$ = Observable
          .merge(
            x.pageNRequest$.map(encapsulateAs('request')),
            x.pageNResponse$.map(encapsulateAs('response'))
          );


        const optsForNextPage = merge(opts, {
          page: opts.page + 1
        });

        const next$ = Observable
          .never() // `next$` shouldn't end when `pageNResponse$` does
          .merge(x.pageNResponse$)
          .shareReplay(1)
          .takeWhile(res => {
            //noinspection UnnecessaryLocalVariableJS
            let isFullPage = path(['response', 'length'], res) === apiPageSize;
            return isFullPage;
          })
          .flatMap(() => {
            return fetchAllPagesIndividually(optsForNextPage)
          });

        //noinspection UnnecessaryLocalVariableJS
        const concattedItem$ = Observable
          .concat(
            mergedItems$,
            next$
          );

        return concattedItem$
      })
      .shareReplay(1)
  );
};


const concatPages = (acc, currentVal, index, source) => acc.concat(currentVal);

const typeIs = typeStr => compose(
  equals(typeStr),
  prop('type')
);

const typeNotIn = typesArray => compose(
  not,
  unary(flip(contains)(typesArray)),
  prop('type')
);

const getAllPages = (optsIn) => {
  const f$ = fetchAllPagesIndividually(optsIn)
    .shareReplay(1);

  const allPagesRequest$ = f$
    .filter(typeIs('request'))
    .map(prop('data'));

  const allPagesResponse$ = f$
    .filter(typeIs('response'))
    .map(prop('data'));

  const theRest$ = f$
    .filter(typeNotIn(['request', 'response', 'data']));

  const latestData$ = allPagesResponse$
    .map(prop('response'))
    .scan(concatPages);

  return {
    allPagesRequest$,
    allPagesResponse$,
    latestData$,
    theRest$,
  }
};

compose(), not(), merge(),unary()等来自Ramda。

于 2016-05-17T10:28:07.567 回答