1

所以这是我试图弄清楚如何使用 rxjs 实现的场景:

  1. 从文件/数据库/等加载一些元数据。元数据中的每个元素都有一个 id 和附加信息——比如实际数据的位置。目前,我在应用程序启动时异步加载所有这些元数据。加载此数据后,Observable 调用完成。最终我可能会添加刷新功能

  2. 在应用程序的稍后时间点,我将需要根据元数据中可用的内容加载特定的数据集。我目前正在尝试使用fetchData(ids:string[]):Observable 之类的函数来执行此操作。这是我不清楚如何在 rxjs 范式下进行的地方。我同样不确定如何使用fetchDatum(id:string):Observable 之类的函数请求单个项目

我当然可以使用过滤器仅对从 IMetadata Observable 发出的与列表中的名称之一匹配的 IMetadata 项目进行操作 - 但我还需要确认在 IMetadata Observable 排放中找到所有请求的项目,如果不是我需要出错。

因此,如果有人请求 id = "Bob" 的 IMetadata - 但源 Observable 没有发出这样的 IMetadata,那么它需要出错。或者如果他们请求 { "Shirley", "Rex", "Samantha" } 并且没有 "Rex" 的数据,那么它应该出错。

我考虑过在这里使用 Rx.Subject ,但从我读到的内容来看,这在 rxjs 范式下通常是不可取的。请告知在 rxjs 范式下哪些方法适用于这种情况。谢谢!

4

1 回答 1

1

这是我想出的解决方案。这个函数创建一个依赖于 IBufferEvaluator 的 Observable,告诉它如何处理源 Observable 发出的每个项目。它可以将项目追加到缓冲区、跳过发出的项目、清除缓冲区、将缓冲区刷新到订阅者等。如果您找到更好的方法来做到这一点,请告诉我,特别是如果它是一个不合时宜的方法盒子 rxjs 解决方案。谢谢。

import Rx from 'rxjs/Rx';

export enum BufferAction {    
    APPEND, /** Append the current emission to the buffer and continue  **/
    SKIP, /** Do nothing, ignoring the current emission if applicable  **/
    FLUSH, /** This will ignore the current emission, if applicable, and flush the existing buffer contents */
    CLEAR, /** Clear the buffer contents. Ignore the current emission, if applicable */
    COMPLETE, /** Mark the Observable as Complete. The buffer will be cleared upon completion. **/
    APPEND_THEN_FLUSH,   /** Append the current emission to the buffer prior to flushing it  **/
    APPEND_THEN_COMPLETE, /** Append the current emission to the buffer and then complete **/
    CLEAR_THEN_APPEND, /** Clear the buffer contents and then append the current emission to it */
    FLUSH_THEN_APPEND, /** Flush the buffer contents and then append the current emission to it */
    FLUSH_THEN_COMPLETE, /** Flush the buffer contents and then mark the Observable as complete */
    APPEND_FLUSH_COMPLETE /** Append the current emission, flush the buffer, and then complete  */
}

export function bufferActionToString(action: BufferAction):string
{
    switch(action)
    {
        case BufferAction.APPEND: return "APPEND";
        case BufferAction.SKIP: return "SKIP";
        case BufferAction.FLUSH: return "FLUSH";
        case BufferAction.CLEAR: return "CLEAR";
        case BufferAction.COMPLETE: return "COMPLETE";
        case BufferAction.APPEND_THEN_FLUSH: return "APPEND_THEN_FLUSH";
        case BufferAction.APPEND_THEN_COMPLETE: return "APPEND_THEN_COMPLETE";
        case BufferAction.CLEAR_THEN_APPEND: return "CLEAR_THEN_APPEND";
        case BufferAction.FLUSH_THEN_APPEND: return "FLUSH_THEN_APPEND";
        case BufferAction.FLUSH_THEN_COMPLETE: return "FLUSH_THEN_COMPLETE";
        case BufferAction.APPEND_FLUSH_COMPLETE: return "APPEND_FLUSH_COMPLETE";
        default: return "Unrecognized Buffer Action [" + action + "]";
    }
}

export interface IBufferEvaluator<T>
{
    evalOnNext(next:T, buffer: T[]):BufferAction;
    evalOnComplete(buffer: T[]):BufferAction;
}

/** bufferWithEval.ts
 *  An Operator that buffers the emissions from the source Observable. As each emission is recieved,
 *  it and the buffered emissions are evaluated to determine what BufferAction to APPEND. You can APPEND
 *  the current emission value to the end of the buffered emissions, you can FLUSH the buffered emissions
 *  before or after appending the current emission value, you can SKIP the current emission value and then
 *  (optionally) FLUSH the buffer, and you can CLEAR the buffer before or after appending the current emission.
 *   
 *  The evalOnNext and evalOnComplete are expected to return a BufferAction to indicate
 *  which action to take. If no evalOnNext is supplied, it will default to APPENDing each emission. The evalOnComplete
 *  will default to FLUSH_THEN_COMPLETE. If evalOnNext or evalOnComplete throw an exception, the Observable will emit 
 *  the exception and cease.
 */
export function bufferWithEval<T>
    (   source: Rx.Observable<T>, 
        evaluatorFactory?: () => IBufferEvaluator<T>
    ) : Rx.Observable<T[]> 
{   
    /** if no evaluatorFactory supplied, use the default evaluatorFactory **/
    if(!evaluatorFactory)
    {
        evaluatorFactory = () => {
            return {
                evalOnNext : function(next: T, buffer: T[]) { return BufferAction.APPEND; },
                evalOnComplete : function(buffer: T[]) { return BufferAction.FLUSH; }
            };
        }
    }

    return new Rx.Observable<T[]>((subscriber: Rx.Subscriber<T[]>) => 
    {
        var _buffer = new Array<T>();          
        var _evaluator = evaluatorFactory();
        var _subscription: Rx.Subscription = null;

        function append(next: T)
        {
            _buffer.push(next);
        }

        function flush()
        {
            try
            {
                subscriber.next(_buffer);
            }
            finally
            {
                // Ignore any exceptions that come from subscriber.next()
                clear();
            }          
        }

        function clear()
        {
            _buffer = new Array<T>();
        }

        function next(next: T)
        {
            try
            {
                var action = _evaluator.evalOnNext(next, _buffer.slice(0));                
                switch(action)
                {
                    case BufferAction.APPEND: { append(next); break; }
                    case BufferAction.SKIP: { break; }
                    case BufferAction.FLUSH: { flush(); break; }
                    case BufferAction.CLEAR: { clear(); break; }
                    case BufferAction.COMPLETE: { complete(); break; }
                    case BufferAction.APPEND_THEN_FLUSH: { append(next); flush(); break; }
                    case BufferAction.APPEND_THEN_COMPLETE: { append(next); complete(); break; }
                    case BufferAction.APPEND_FLUSH_COMPLETE: { append(next); flush(); complete(); break; }
                    case BufferAction.CLEAR_THEN_APPEND: { clear(); append(next); break; }
                    case BufferAction.FLUSH_THEN_APPEND: { flush(); append(next); break; }
                    case BufferAction.FLUSH_THEN_COMPLETE: { flush(); complete(); break; }

                    default: throw new Error("next(): Invalid BufferAction '" + bufferActionToString(action) + "'");
                }   
            }
            catch(e)
            {
                error(e);
            }          
        }    

        function complete()
        {
            try
            {            
                var action = _evaluator.evalOnComplete(_buffer.slice(0));
                switch(action)
                {
                    case BufferAction.FLUSH_THEN_COMPLETE:
                    case BufferAction.FLUSH:  { flush(); } 

                    case BufferAction.CLEAR: 
                    case BufferAction.COMPLETE: { break; }                   

                    case BufferAction.APPEND:
                    case BufferAction.APPEND_THEN_FLUSH:
                    case BufferAction.APPEND_THEN_COMPLETE:
                    case BufferAction.APPEND_FLUSH_COMPLETE:
                    case BufferAction.SKIP: 
                    case BufferAction.CLEAR_THEN_APPEND: 
                    case BufferAction.FLUSH_THEN_APPEND: 
                    default: throw new Error("complete(): Invalid BufferAction '" + bufferActionToString(action) + "'");
                }

                clear();
                subscriber.complete();
                _subscription.unsubscribe();        
            }
            catch(e)
            {
                error(e);
            }           
        }        

        function error(err: any)
        {                  
            try
            {
                subscriber.error(err);
            }
            finally
            {   
                _subscription.unsubscribe();
            }
        }          

        _subscription = source.subscribe(next, error, complete);
        return _subscription;
    });
}
于 2016-03-15T22:54:02.173 回答