2

我需要多次查询设备。每个查询都需要是异步的,并且设备一次不支持同时查询。而且,一旦被查询,就不能马上再次查询。它需要至少 1 秒的暂停才能正常工作。

我的两个查询,由saveClock()and执行saveConfig(),返回一个 Promise 并且都通过按预期返回 undefined 来解决。

在以下代码中,为什么删除take()阻止toArray()被调用?
这里发生了什么,有没有更好的方法来实现相同的行为?

export const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .map(action => {
      // access store and create object data
      // ...
      return data;
    })
    .mergeMap(data =>
      Rx.Observable.from([
        Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
        Rx.Observable.timer(1000),
        Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)),
        Rx.Observable.of(data.id)
     ])
    )
    .concatAll()
    .take(4)
    .toArray()
    // [undefined, 0, undefined, "id"]
    .map(x => { type: COMPLETED, id: x[3] });
4

1 回答 1

2

我看到了几件事:

您的 final.map()缺少括号,在当前形式下是语法错误,但细微的变化可能会使其意外成为带标签的语句,而不是返回对象。因为在当前形式下它是一个语法错误,我想这只是这篇文章中的一个错误,而不是在你的代码中(它甚至不会运行),但请仔细检查!

// before
.map(x => { type: COMPLETED, id: x[3] });

// after
.map(x => ({ type: COMPLETED, id: x[3] }));

修复后,该示例确实使用简单的 redux-observable 测试用例运行:http: //jsbin.com/hunale/edit ?js,output所以如果没有什么值得注意的我做的与你不同,问题似乎出在代码中假如。随意添加额外的见解,甚至更好,在 JSBin/git 存储库中为我们重现它。


您没有提到但非常值得注意的一件事是,在 redux-observable 中,您的史诗通常是长期存在的“流程管理器”。这个史诗实际上只会处理其中一个保存,然后完成(),这可能不是您真正想要的?用户每次应用程序启动时只能保存一次吗?似乎不太可能。

相反,您需要通过将此逻辑封装在mergeMap. Thetake(4)和传递data.idthen 变得无关紧要:

const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .mergeMap(data =>
      Rx.Observable.from([
        Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
        Rx.Observable.timer(1000),
        Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
      ])
      .concatAll()
      .toArray()
      .map(() => ({ type: COMPLETED, id: data.id }))
    );

Ben Lesh 在他最近的 AngularConnect 演讲中描述了这种流的分离,在错误的情况下,但它仍然适用:https ://youtu.be/3LKMwkuK0ZE?t=20m (别担心,这不是 Angular 特定的!)

接下来,我想分享一些不请自来的重构建议,它们可能会让你的生活更轻松,但当然这是固执己见,所以请随意忽略:

我会重构以更准确地直观地反映事件的顺序,并降低复杂性:

const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .mergeMap(data =>
      Rx.Observable.from(saveClock(data.id, data.clock))
        .delay(1000)
        .mergeMap(() => saveConfig(data.id, data.config))
        .map(() => ({ type: COMPLETED, id: data.id }))
    );

在这里,我们正在消费由返回的 Promise,将saveClock其输出延迟 1000 毫秒,将结果合并映射到一个调用,saveConfig()该调用也将返回一个将被消费的 Promise。然后最终将结果映射到我们的COMPLETE操作。

最后,请记住,如果您的 Epic确实还活着并且寿命很长,那么这个史诗中没有任何内容可以阻止它接收多个 SAVE 请求,而其他请求仍在进行中或尚未用完所需的 1000 毫秒延迟请求之间。即,如果确实需要任何请求之间的 1000 毫秒间隔,那么您的史诗本身并不能完全阻止您的 UI 代码破坏它。在这种情况下,您可能需要考虑添加更复杂的缓冲背压机制,例如使用.zip()带有BehaviorSubject.

http://jsbin.com/waqipol/edit?js,输出

const saveEpic = (action$, store) => {
  // used to control how many we want to take,
  // the rest will be buffered by .zip()
  const requestCount$ = new Rx.BehaviorSubject(1)
    .mergeMap(count => new Array(count));

  return action$.ofType(SAVE)
    .zip(requestCount$, action => action)
    .mergeMap(data =>
      Rx.Observable.from(saveClock(data.id, data.clock))
        .delay(1000)
        .mergeMap(() => saveConfig(data.id, data.config))
        .map(() => ({ type: COMPLETED, id: data.id }))
        // we're ready to take the next one, when available
        .do(() => requestCount$.next(1))
    );
};

这样一来,当我们仍在处理现有请求时,保存请求的请求就会被缓冲,并且我们一次只接收其中一个。但请记住,这是一个无界缓冲区——这意味着待处理操作队列的增长速度可能比刷新缓冲区的速度快得多。这是不可避免的,除非您采用有损背压策略,例如丢弃重叠的请求等。

如果您有其他史诗的重叠要求,即每秒发送请求不超过一次,您将需要创建某种单一的监督者来为所有史诗提供这种保证。

这似乎都非常复杂,但具有讽刺意味的是,这在 RxJS 中比在传统的命令式代码中要容易得多最难的部分实际上是了解这些模式。

于 2016-11-08T18:42:52.257 回答