26

如何使用 RxJs 在浏览器中执行以下场景:

  • 提交数据到队列进行处理
  • 取回工作 ID
  • 每 1 秒轮询另一个端点,直到结果可用或 60 秒过去(然后失败)

我提出的中间解决方案:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );
  1. 一旦数据到达或发生错误,是否有没有中间变量的方法来停止计时器?我现在可以引入新的 observable 然后使用takeUntil
  2. flatMap这里的用法在语义上正确吗?也许这整个事情应该被重写而不是与flatMap?
4

5 回答 5

43

从顶部开始,你得到了一个承诺,你会变成一个 observable。一旦这产生了一个值,您希望每秒进行一次调用,直到您收到特定的响应(成功)或直到经过一定的时间。我们可以将此解释的每个部分映射到一个 Rx 方法:

“一旦这产生了一个值” = map/ flatMapflatMap在这种情况下,因为接下来发生的也将是可观察的,我们需要将它们展平)

“每秒一次”=interval

"收到一定的回复" =filter

“或” =amb

“已经过去了一定的时间”=timer

从那里,我们可以像这样拼凑起来:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .amb(
        Rx.Observable.timer(60000)
          .flatMap(() => Rx.Observable.throw(new Error('Timeout')))
      )
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

一旦我们得到了我们的初始结果,我们将其投射到两个可观察对象之间的竞争中,一个在收到成功的响应时会产生一个值,另一个会在经过一定时间后产生一个值。第二个flatMap是因为.throw在可观察实例上不存在,并且方法Rx.Observable返回一个也需要展平的可观察对象。

事实证明amb/timer组合实际上可以替换为timeout,如下所示:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

我省略了.delay您在示例中的内容,因为它没有在您想要的逻辑中描述,但它可以很容易地适合这个解决方案。

所以,直接回答你的问题:

  1. 在上面的代码中,不需要手动停止任何事情,因为当interval订阅者数量下降到零时,就会被处理掉,这将在take(1)or amb/timeout完成时发生。
  2. 是的,您原来的两种用法都是有效的,因为在这两种情况下,您都将 observable 的每个元素投影到新的 observable 中,并且希望将 observable 的结果 observable 扁平化为常规 observable。

这是我用来测试解决方案的 jsbinpollQueueForResult (您可以调整返回的值以获得所需的成功/超时;为了快速测试,时间除以 10)。

于 2016-03-15T13:32:54.673 回答
15

A small optimization to the excellent answer from @matt-burnell. You can replace the filter and take operators with the first operator as follows

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .first(x => x.completed)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))

  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  );

Also, for people that may not know, the flatMap operator is an alias for mergeMap in RxJS 5.0.

于 2016-11-30T09:24:45.260 回答
2

不是你的问题,但我需要相同的功能

import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'

const defaultMaxWaitTimeMilliseconds = 5 * 1000

function isAsyncThingSatisfied(result) {
  return true
}

export function doAsyncThingSeveralTimesWithTimeout(
  doAsyncThingReturnsPromise,
  maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
  checkEveryMilliseconds = 500,
) {
  const subject$ = race(
    interval(checkEveryMilliseconds).pipe(
      mergeMap(() => doAsyncThingReturnsPromise()),
      takeWhileInclusive(result => isAsyncThingSatisfied(result)),
    ),
    of(null).pipe(
      delay(maxWaitTimeMilliseconds),
      switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
    )
  )

  return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}

例子

// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'

const defaultMaxWaitTimeMilliseconds = 5 * 1000

export function mailhogWaitForNEmails(
  mailhogClient,
  numberOfExpectedEmails,
  maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
  checkEveryMilliseconds = 500,
) {
  let tries = 0

  const mails$ = race(
    interval(checkEveryMilliseconds).pipe(
      mergeMap(() => mailhogClient.getAll()),
      takeWhileInclusive(mails => {
        tries += 1
        return mails.total < numberOfExpectedEmails
      }),
    ),
    of(null).pipe(
      delay(maxWaitTimeMilliseconds),
      switchMap(() => throwError(`mailhogWaitForNEmails timeout after ${tries} tries`))
    )
  )

  // toPromise returns promise which contains the last value from the Observable sequence.
  // If the Observable sequence is in error, then the Promise will be in the rejected stage.
  // If the sequence is empty, the Promise will not resolve.
  return mails$.toPromise(Promise)
}

// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'

export async function mailhogWaitForEmailAndClean(mailhogClient) {
  const mails = await mailhogWaitForNEmails(mailhogClient, 1)

  if (mails.count !== 1) {
    throw new Error(
      `Expected to receive 1 email, but received ${mails.count} emails`,
    )
  }

  await mailhogClient.deleteAll()

  return mails.items[0]
}
于 2019-02-22T19:17:56.977 回答
2

我们也有相同的用例,下面的代码效果很好。

import { timer, Observable } from "rxjs";
import { scan, tap, switchMapTo, first } from "rxjs/operators";

function checkAttempts(maxAttempts: number) {
  return (attempts: number) => {
    if (attempts > maxAttempts) {
      throw new Error("Error: max attempts");
    }
  };
}

export function pollUntil<T>(
  pollInterval: number,
  maxAttempts: number,
  responsePredicate: (res: any) => boolean
) {
  return (source$: Observable<T>) =>
    timer(0, pollInterval).pipe(
      scan(attempts => ++attempts, 0),
      tap(checkAttempts(maxAttempts)),
      switchMapTo(source$),
      first(responsePredicate)
    );
}

如果尝试次数已达到限制,则会引发错误,导致输出流被取消订阅。此外,您仅在不满足定义为 responsePredicate 的给定条件之前发出 http 请求。

示例用法:

import { of } from "rxjs";

import { pollUntil } from "./poll-until-rxjs";

const responseObj = { body: { inProgress: true } };
const response$ = of(responseObj);
// this is to simulate a http call
response$
  .pipe(pollUntil(1000, 3, ({ body }) => !body.inProgress))
  .subscribe(({ body }) => console.log("Response body: ", body));

setTimeout(() => (responseObj.body.inProgress = false), 1500);
于 2020-05-05T21:54:44.460 回答
0

Angular / typescript 从上面重写的解决方案:

export interface PollOptions {
  interval: number;
  timeout: number;
}

const OPTIONS_DEFAULT: PollOptions = {
  interval: 5000,
  timeout: 60000
};
@Injectable()
class PollHelper {
  startPoll<T>(
    pollFn: () => Observable<T>, // intermediate polled responses
    stopPollPredicate: (value: T) => boolean, // condition to stop polling
    options: PollOptions = OPTIONS_DEFAULT): Observable<T> {
    return interval(options.interval)
      .pipe(
        exhaustMap(() => pollFn()),
        first(value => stopPollPredicate(value)),
        timeout(options.timeout)
      );
  }
}

例子:

pollHelper.startPoll<Response>(
  () => httpClient.get<Response>(...),
  response => response.isDone()
).subscribe(result => {
  console.log(result);
});
于 2020-03-22T14:13:45.263 回答