2

例如,这个 jsbin 示例的代码:

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000) // Asynchronously wait 1000ms then continue
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));

当我takeUntil如上所述使用时,在调度CANCEL动作并延迟 1 秒后,动作不再触发。为什么?

4

1 回答 1

12

问题在于对 RxJS 工作原理的微妙但严重的误解——但不要害怕,这很常见。

所以给出你的例子:

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000)
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));

这个史诗的行为可以描述为过滤掉所有与 type 不匹配的动作PING。当一个动作匹配时,等待 1000 毫秒,然后将该动作映射到另一个动作{ type: PONG },该动作将被发出,然后由 redux-observable 分派。如果在应用程序运行期间的任何时候,有人调度了一个类型为 的动作,CANCEL则从源取消订阅,这意味着整个链将取消订阅,从而终止史诗。

如果您强制执行,看看它的外观可能会有所帮助:

const pingEpic = action$ => {
  return new Rx.Observable(observer => {
    console.log('[pingEpic] subscribe');
    let timer;

    const subscription = action$.subscribe(action => {
      console.log('[pingEpic] received action: ' + action.type);

      // When anyone dispatches CANCEL, we stop listening entirely!
      if (action.type === CANCEL) {
        observer.complete();
        return;
      }

      if (action.type === PING) {
        timer = setTimeout(() => {
          const output = { type: PONG };
          observer.next(output);
        }, 1000);
      }
    });

    return {
      unsubscribe() {
        console.log('[pingEpic] unsubscribe');
        clearTimeout(timer);
        subscription.unsubscribe();
      }
    };
  });
};

您可以在此处使用假商店运行此代码:http: //jsbin.com/zeqasih/edit ?js,console


相反,您通常想要做的是将您想要取消的订阅者链与假设无限期收听的顶级链隔离开来。尽管您的示例(从文档中修改)是人为的,但让我们先来看看它。

在这里,我们使用mergeMap运算符让我们采取匹配的操作并映射到另一个单独的可观察链

演示: http: //jsbin.com/nofato/edit ?js,output

const pingEpic = action$ =>
  action$.ofType(PING)
    .mergeMap(() =>
      Observable.timer(1000)
        .takeUntil(action$.ofType(CANCEL))
        .mapTo({ type: PONG })
    );

我们Observable.timer通常会等待 1000 毫秒,然后将它发出的值(恰好是数字 0,但这在这里并不重要)映射到我们的PONG操作。我们还说我们想从计时器源中“获取”,直到它正常完成或者我们收到一个类型的动作CANCEL

这将隔离链,因为mergeMap将继续订阅您返回的可观察对象,直到它出错或完成。但是当这种情况发生时,它本身并不会停止订阅你应用它的源;在action$.ofType(PING)这个例子中。

一个更真实的例子是在取消部分的 redux-observable 文档中

在这里,我们将 .takeUntil() 放在了 .mergeMap() 之后,但在 AJAX 调用之后;这很重要,因为我们只想取消 AJAX 请求,而不是阻止 Epic 监听任何未来的操作。

const fetchUserEpic = action$ =>
  action$.ofType(FETCH_USER)
    .mergeMap(action =>
      ajax.getJSON(`/api/users/${action.payload}`)
        .map(fetchUserFulfilled)
        .takeUntil(action$.ofType(FETCH_USER_CANCELLED))
    );

这一切可能听起来令人困惑,但就像最强大的东西一样,一旦你得到它,它就会变得直观。Ben Lesh 在他最近的演讲中出色地解释了 Observables 的工作原理,包括讨论运营商如何成为 Observables 链,甚至是关于隔离订阅者链。尽管演讲是在 AngularConnect 上进行的,但它并不是特定于 Angular 的。


顺便说一句,重要的是要注意您的史诗不会吞噬或以其他方式阻止动作到达减速器,例如,当您将传入动作映射到另一个不同的动作时。事实上,当你的史诗收到一个动作时,它已经通过你的 reducers 了。把你的史诗想象成一个边车进程,它监听你的应用程序动作流,但不能阻止正常的 redux 事情发生,它只能发出新的动作。

于 2016-11-18T01:24:17.660 回答