1

我有一个 Rx 流,它是来自特定组件的传出更改源。

有时我希望能够在流上进入批处理模式,以便传递给onNext的项目累积并仅在退出批处理模式时传递。

通常我会通过流传递单个项目:

stream.onNext(1); 
stream.onNext(2);

在传递给onNext的项目和在subscribe中接收的项目之间存在一对一的映射,因此前面的代码片段导致两次调用subscribe的值分别为 1 和 2。

我正在寻找的批处理模式可能会像这样工作:

stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();

在这种情况下,我希望仅使用单个串联数组 [1, 2] 调用一次订阅。

重申一下,我有时只需要批处理模式,有时我会传递未连接的项目。

如何使用 Rx 实现这种行为?

注意:以前我通过onNext传递数组,尽管这主要是为了使在单独模式和批处理模式下类型保持不变。

例如:

stream.onNext([1, 2, 3]); 
stream.onNext([4, 5, 6]);

订阅接收 [1, 2, 3] 然后 [4, 5, 6],但是在批处理模式下订阅接收连接结果 [1, 2, 3, 4, 5, 6]。

因此,当您以这种方式看待它时,它更像是unconcatenatedconcatenated模式(而不是individualbatch-mode)。但我认为问题非常相似。

4

3 回答 3

2

这是 James World 的 c# 解决方案转换为 JavaScript 并针对您的特定问题量身定制的精神。

var source = new Rx.Subject();
source.batchMode = new Rx.BehaviorSubject(false);
source.enterBatchMode = function() { this.batchMode.onNext(true); };
source.exitBatchMode = function() { this.batchMode.onNext(false); };
var stream = source
  .window(source.batchMode
    .distinctUntilChanged()
    .skipWhile(function(mode) { return !mode; }))
  .map(function(window, i) {
    if ((i % 2) === 0) {
      // even windows are unbatched windows
      return window;
    }

    // odd windows are batched
    // collect the entries in an array
    // (e.g. an array of arrays)
    // then use Array.concat() to join
    // them together
    return window
      .toArray()
      .filter(function(array) { return array.length > 0; })
      .map(function(array) {
        return Array.prototype.concat.apply([], array);
      });
  })
  .concatAll();


stream.subscribe(function(value) {
  console.log("received ", JSON.stringify(value));
});

source.onNext([1, 2, 3]);

source.enterBatchMode();
source.onNext([4]);
source.onNext([5, 6]);
source.onNext([7, 8, 9]);
source.exitBatchMode();

source.onNext([10, 11]);

source.enterBatchMode();
source.exitBatchMode();

source.onNext([12]);

source.enterBatchMode();
source.onNext([13]);
source.onNext([14, 15]);
source.exitBatchMode();
<script src="https://getfirebug.com/firebug-lite-debug.js"></script>
<script src="https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js"></script>

于 2014-11-26T19:35:12.120 回答
1

詹姆斯的评论可能会提供您正在寻找的答案,但我想提供一个您可能也喜欢的简单替代方案。

我将您的问题读为:“我如何传递 A 型或 B 型的值”?

答案:定义一个具有“either”语义的类型,它包含类型 A 或类型 B 的数据。

var valueOrBuffer = function(value, buffer)
{
  this.Value = value;
  this.Buffer = buffer;
};
var createValue = function(value) { return new valueOrBuffer(value); }
var createBuffer = function(buffer) { return new valueOrBuffer(undefined, buffer); }

stream.OnNext(createValue(1));
stream.OnNext(createValue(2));
stream.OnNext(createValue(3));

stream.OnNext(createBuffer([4, 5, 6]));

无需切换。不过,您的观察者可能需要稍作改动:

stream.Subscribe(function(v) {
  if (v.Value)
    onNextValue(v.Value);
  else
    onNextBuffer(v.Buffer);
});
于 2014-11-24T15:09:06.653 回答
0

一个解决方案......各种各样的。在使用 Window 函数(来自 James 的链接示例)未能实现任何目标之后,我想出了以下解决方案,尽管它是在 C# 中,我不知道如何将它转换为 RxJS 和 Javascript。

如果有人可以使用内置的 Rx 函数来改善这一点,请这样做,我将非常感谢一个更好的解决方案。

我已经实现了一个BatchedObservable类,它是T类型的观察者和IEnumerable T类型的可观察对象。在正常模式下,它将每个T包装在一个传递的数组中。在批处理模式下,它会累积T s 直到退出批处理模式,然后它会传递收集的批处理T s 列表。

    public class BatchedObservable<T> : IObservable<IEnumerable<T>>, IObserver<T>
    {
        bool batching = false;
        List<T> batchedItems;
        List<IObserver<IEnumerable<T>>> observers = new List<IObserver<IEnumerable<T>>>();

        public void EnterBatchMode()
        {
            batching = true;
            batchedItems = new List<T>();
        }

        public void ExitBatchMode()
        {
            batching = false;
            if (batchedItems.Count > 0)
            {
                foreach (var observer in observers)
                {
                    observer.OnNext(batchedItems);
                }
            }
            batchedItems = null;
        }


        public IDisposable Subscribe(IObserver<IEnumerable<T>> observer)
        {
            observers.Add(observer);

            return Disposable.Create(() 
                => observers.Remove(observer)
            );
        }

        public void OnCompleted()
        {
            foreach (var observer in observers)
            {
                observer.OnCompleted();
            }
        }

        public void OnError(Exception error)
        {
            foreach (var observer in observers)
            {
                observer.OnError(error);
            }
        }

        public void OnNext(T value)
        {
            if (batching)
            {
                batchedItems.Add(value);
            }
            else
            {
                foreach (var observer in observers)
                {
                    observer.OnNext(new T[] { value });
                }
            }
        }
    }

示例用法如下。运行时,按任意键切换批处理模式。

    static void Main(string[] args)
    {
        var batched = new BatchedObservable<long>();

        var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));
        dataStream.Subscribe(batched);

        batched.Subscribe(PrintArray);

        var batchModeEnabled = false;
        while (true)
        { 
            Console.ReadKey();

            batchModeEnabled = !batchModeEnabled;
            if (batchModeEnabled)
            {
                batched.EnterBatchMode();
            }
            else
            {
                batched.ExitBatchMode();
            }
        }
    }

    private static void PrintArray<T>(IEnumerable<T> a)
    {
        Console.WriteLine("[" + string.Join(", ", a.Select(i => i.ToString()).ToArray()) + "]");
    }
于 2014-11-25T09:02:50.197 回答