这是我想出的解决方案。这个函数创建一个依赖于 IBufferEvaluator 的 Observable,告诉它如何处理源 Observable 发出的每个项目。它可以将项目追加到缓冲区、跳过发出的项目、清除缓冲区、将缓冲区刷新到订阅者等。如果您找到更好的方法来做到这一点,请告诉我,特别是如果它是一个不合时宜的方法盒子 rxjs 解决方案。谢谢。
import Rx from 'rxjs/Rx';
export enum BufferAction {
APPEND, /** Append the current emission to the buffer and continue **/
SKIP, /** Do nothing, ignoring the current emission if applicable **/
FLUSH, /** This will ignore the current emission, if applicable, and flush the existing buffer contents */
CLEAR, /** Clear the buffer contents. Ignore the current emission, if applicable */
COMPLETE, /** Mark the Observable as Complete. The buffer will be cleared upon completion. **/
APPEND_THEN_FLUSH, /** Append the current emission to the buffer prior to flushing it **/
APPEND_THEN_COMPLETE, /** Append the current emission to the buffer and then complete **/
CLEAR_THEN_APPEND, /** Clear the buffer contents and then append the current emission to it */
FLUSH_THEN_APPEND, /** Flush the buffer contents and then append the current emission to it */
FLUSH_THEN_COMPLETE, /** Flush the buffer contents and then mark the Observable as complete */
APPEND_FLUSH_COMPLETE /** Append the current emission, flush the buffer, and then complete */
}
export function bufferActionToString(action: BufferAction):string
{
switch(action)
{
case BufferAction.APPEND: return "APPEND";
case BufferAction.SKIP: return "SKIP";
case BufferAction.FLUSH: return "FLUSH";
case BufferAction.CLEAR: return "CLEAR";
case BufferAction.COMPLETE: return "COMPLETE";
case BufferAction.APPEND_THEN_FLUSH: return "APPEND_THEN_FLUSH";
case BufferAction.APPEND_THEN_COMPLETE: return "APPEND_THEN_COMPLETE";
case BufferAction.CLEAR_THEN_APPEND: return "CLEAR_THEN_APPEND";
case BufferAction.FLUSH_THEN_APPEND: return "FLUSH_THEN_APPEND";
case BufferAction.FLUSH_THEN_COMPLETE: return "FLUSH_THEN_COMPLETE";
case BufferAction.APPEND_FLUSH_COMPLETE: return "APPEND_FLUSH_COMPLETE";
default: return "Unrecognized Buffer Action [" + action + "]";
}
}
export interface IBufferEvaluator<T>
{
evalOnNext(next:T, buffer: T[]):BufferAction;
evalOnComplete(buffer: T[]):BufferAction;
}
/** bufferWithEval.ts
* An Operator that buffers the emissions from the source Observable. As each emission is recieved,
* it and the buffered emissions are evaluated to determine what BufferAction to APPEND. You can APPEND
* the current emission value to the end of the buffered emissions, you can FLUSH the buffered emissions
* before or after appending the current emission value, you can SKIP the current emission value and then
* (optionally) FLUSH the buffer, and you can CLEAR the buffer before or after appending the current emission.
*
* The evalOnNext and evalOnComplete are expected to return a BufferAction to indicate
* which action to take. If no evalOnNext is supplied, it will default to APPENDing each emission. The evalOnComplete
* will default to FLUSH_THEN_COMPLETE. If evalOnNext or evalOnComplete throw an exception, the Observable will emit
* the exception and cease.
*/
export function bufferWithEval<T>
( source: Rx.Observable<T>,
evaluatorFactory?: () => IBufferEvaluator<T>
) : Rx.Observable<T[]>
{
/** if no evaluatorFactory supplied, use the default evaluatorFactory **/
if(!evaluatorFactory)
{
evaluatorFactory = () => {
return {
evalOnNext : function(next: T, buffer: T[]) { return BufferAction.APPEND; },
evalOnComplete : function(buffer: T[]) { return BufferAction.FLUSH; }
};
}
}
return new Rx.Observable<T[]>((subscriber: Rx.Subscriber<T[]>) =>
{
var _buffer = new Array<T>();
var _evaluator = evaluatorFactory();
var _subscription: Rx.Subscription = null;
function append(next: T)
{
_buffer.push(next);
}
function flush()
{
try
{
subscriber.next(_buffer);
}
finally
{
// Ignore any exceptions that come from subscriber.next()
clear();
}
}
function clear()
{
_buffer = new Array<T>();
}
function next(next: T)
{
try
{
var action = _evaluator.evalOnNext(next, _buffer.slice(0));
switch(action)
{
case BufferAction.APPEND: { append(next); break; }
case BufferAction.SKIP: { break; }
case BufferAction.FLUSH: { flush(); break; }
case BufferAction.CLEAR: { clear(); break; }
case BufferAction.COMPLETE: { complete(); break; }
case BufferAction.APPEND_THEN_FLUSH: { append(next); flush(); break; }
case BufferAction.APPEND_THEN_COMPLETE: { append(next); complete(); break; }
case BufferAction.APPEND_FLUSH_COMPLETE: { append(next); flush(); complete(); break; }
case BufferAction.CLEAR_THEN_APPEND: { clear(); append(next); break; }
case BufferAction.FLUSH_THEN_APPEND: { flush(); append(next); break; }
case BufferAction.FLUSH_THEN_COMPLETE: { flush(); complete(); break; }
default: throw new Error("next(): Invalid BufferAction '" + bufferActionToString(action) + "'");
}
}
catch(e)
{
error(e);
}
}
function complete()
{
try
{
var action = _evaluator.evalOnComplete(_buffer.slice(0));
switch(action)
{
case BufferAction.FLUSH_THEN_COMPLETE:
case BufferAction.FLUSH: { flush(); }
case BufferAction.CLEAR:
case BufferAction.COMPLETE: { break; }
case BufferAction.APPEND:
case BufferAction.APPEND_THEN_FLUSH:
case BufferAction.APPEND_THEN_COMPLETE:
case BufferAction.APPEND_FLUSH_COMPLETE:
case BufferAction.SKIP:
case BufferAction.CLEAR_THEN_APPEND:
case BufferAction.FLUSH_THEN_APPEND:
default: throw new Error("complete(): Invalid BufferAction '" + bufferActionToString(action) + "'");
}
clear();
subscriber.complete();
_subscription.unsubscribe();
}
catch(e)
{
error(e);
}
}
function error(err: any)
{
try
{
subscriber.error(err);
}
finally
{
_subscription.unsubscribe();
}
}
_subscription = source.subscribe(next, error, complete);
return _subscription;
});
}