31

我有一个以正常方式使用的 RxJS 序列...

但是,在可观察的“onNext”处理程序中,一些操作将同步完成,但其他操作需要异步回调,在处理输入序列中的下一项之前需要等待。

...有点困惑如何做到这一点。有任何想法吗?谢谢!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);
4

3 回答 3

26

您想要执行的每个操作都可以建模为可观察的。甚至同步操作也可以这样建模。然后您可以使用map将您的序列转换为序列序列,然后使用concatAll来展平序列。

someObservable
    .map(function (item) {
        if (item === "do-something-async") {
            // create an Observable that will do the async action when it is subscribed
            // return Rx.Observable.timer(5000);

            // or maybe an ajax call?  Use `defer` so that the call does not
            // start until concatAll() actually subscribes.
            return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
        }
        else {
            // do something synchronous but model it as an async operation (using Observable.return)
            // Use defer so that the sync operation is not carried out until
            // concatAll() reaches this item.
            return Rx.Observable.defer(function () {
                return Rx.Observable.return(someSyncAction(item));
            });
        }
    })
    .concatAll() // consume each inner observable in sequence
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });

要回复您的一些评论……在某些时候,您需要对函数流施加一些期望。在大多数语言中,当处理可能是异步的函数时,函数签名是异步的,函数的实际异步与同步性质被隐藏为函数的实现细节。无论您使用的是 javaScript promises、Rx observables、c# Tasks、c++ Futures 等,都是如此。函数最终返回一个 promise/observable/task/future/etc,如果函数实际上是同步的,那么它返回的对象是刚刚完成。

话虽如此,既然这是 JavaScript,你可以作弊:

var makeObservable = function (func) {
    return Rx.Observable.defer(function () {
        // execute the function and then examine the returned value.
        // if the returned value is *not* an Rx.Observable, then
        // wrap it using Observable.return
        var result = func();
        return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
    });
}

someObservable
    .map(makeObservable)
    .concatAll()
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });
于 2014-02-19T10:08:55.487 回答
5

首先,将您的异步操作移出subscribe,它不是为异步操作而设计的。

您可以使用的是mergeMap(alias flatMap) 或 concatMap. (我提到了他们两个,但实际上concatMap参数设置为 1。)设置不同的并发参数很有用,因为有时您希望限制并发查询的数量,但仍然运行几个并发。mergeMapconcurrent

source.concatMap(item => {
  if (item == 'do-something-async-and-wait-for-completion') {
    return Rx.Observable.timer(5000)
      .mapTo(item)
      .do(e => console.log('okay, we can continue'));
    } else {
      // do something synchronously and keep on going immediately
      return Rx.Observable.of(item)
        .do(e => console.log('ready to go!!!'));
    }
}).subscribe();

我还将展示如何对通话进行速率限制。忠告:仅在您实际需要时限制速率,例如在调用外部 API 时,该 API 每秒或分钟只允许一定数量的请求。否则最好只限制并发操作的数量并让系统以最大速度移动。

我们从以下片段开始:

const concurrent;
const delay;
source.mergeMap(item =>
  selector(item, delay)
, concurrent)

concurrent接下来,我们需要为 选择值delay并实现selectorconcurrent并且delay密切相关。例如,如果我们想每秒运行 10 个项目,我们可以使用concurrent = 10and delay = 1000(毫秒),也可以使用concurrent = 5anddelay = 500concurrent = 4and delay = 400。每秒的项目数将始终为concurrent / (delay / 1000)

现在让我们实现selector. 我们有几个选择。我们可以为 设置一个最小的执行时间selector,我们可以给它添加一个恒定的延迟,我们可以在结果可用时立即发出结果,我们可以在最小延迟过去后才发出结果等等。甚至可以使用timeout运算符添加超时。方便。

设置最短时间,尽早发送结果:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .merge(Rx.Observable.timer(delay).ignoreElements())
}

设置最短时间,延迟发送结果:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .zip(Rx.Observable.timer(delay), (item, _))
}

添加时间,提前发送结果:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .concat(Rx.Observable.timer(delay).ignoreElements())
}

添加时间,延迟发送结果:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .delay(delay)
}
于 2016-07-28T17:01:55.783 回答
0

另一个执行手动异步操作的简单示例。

请注意,这不是一个好的反应实践!如果您只想等待 1000ms,请使用 Rx.Observable.timer 或延迟运算符。

someObservable.flatMap(response => {
  return Rx.Observable.create(observer => {
    setTimeout(() => {
      observer.next('the returned value')
      observer.complete()
    }, 1000)
  })
}).subscribe()

现在,用你的异步函数替换 setTimeout,比如 Image.onload 或 fileReader.onload ...

于 2017-06-12T10:35:37.040 回答