1

我正在尝试对 HTTP 请求实施背压策略,以在某些条件下暂时阻止挂起的请求数秒。暂停的逻辑将基于另一个 Observable。

我的研究和理解使我相信pausableBuffered操作员完全符合我的需要。记录在这里http://reactivex.io/documentation/operators/backpressure.html

但是我在 ReactiveX v5 (5.0.0-beta.0) 中找不到这个运算符,迁移指南 (v4 - v5) 似乎表明它们已被删除。如果是这种情况,我如何使用 v5 可用的运算符来达到预期的结果?

4

2 回答 2

3

背压的故事现在已经完全放弃了。

这是获得相同结果的一种方法:

const pausableBuffered = (observable, pauser) => {
    const subj = new rx.Subject();

    let buffer = [];
    const nextEmitter = x => subj.next(x);
    const nextBuffer = x => buffer.push(x);

    let subscriber = nextEmitter;
    observable.subscribe(x => subscriber(x));

    pauser.subscribe(value => {
        if (value) {
            subscriber = nextBuffer;
        } else {
            buffer.forEach(nextEmitter);
            buffer = [];
            subscriber = nextEmitter;
        }
    })

    return subj;
};
于 2016-01-30T05:35:32.630 回答
0

我偶然发现了这个答案,对于我的用例,我把它变成了一个管道

import { Observable, Subject, Subscription } from "rxjs";

export function pausable(pauseToken: Observable<boolean>, startPuased: boolean, lastOnly: boolean) {
    return function <T>(source: Subject<T>): Observable<T> {
        let buffer: T[] = [];
        const nextEmitter = (x: T) => subj.next(x);
        const nextBuffer = (x: any) => buffer.push(x);

        var sourceSubscription: Subscription;
        var pauseSubscription: Subscription;

        var subj = new Subject<T>();

        let subscriber = nextEmitter;
        if (startPuased) {
            subscriber = nextBuffer;
        }
        sourceSubscription = source.subscribe({
            next(value) {
                subscriber(value);
            },
            error(error) {
                subj.error(error);
            },
            complete() {
                subj.complete();
                pauseSubscription?.unsubscribe();
            }
        })

        pauseSubscription = pauseToken.subscribe({
            next(value) {
                if (value) {
                    subscriber = nextBuffer;
                } else {
                    if (lastOnly && buffer.length > 0) {
                        nextEmitter(buffer.pop())
                    } else {
                        buffer.forEach(nextEmitter);
                    }
                    buffer = [];
                    subscriber = nextEmitter;
                }
            },
            complete() {
                sourceSubscription?.unsubscribe();
                pauseSubscription?.unsubscribe();
            }
        });

        return subj;
    }
}

于 2021-03-10T21:57:36.213 回答