我有一个交错流,我将其拆分为单独的顺序流。
制片人
int streamCount = 3;
new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
outputs[]
下面定义了处理流的主题在哪里。用于同时.ObserveOn()
处理流(多线程)。
消费者
var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();
outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));
这段代码的问题是它会尽可能快地读取整个枚举,即使输出流无法赶上。在我的情况下,可枚举是一个文件流,所以这可能会导致使用大量内存。因此,如果缓冲区达到某个阈值,我希望阻止读取。