4

我有一个交错流,我将其拆分为单独的顺序流

制片人

int streamCount = 3;

new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

outputs[]下面定义了处理流的主题在哪里。用于同时.ObserveOn()处理流(多线程)。

消费者

var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));

这段代码的问题是它会尽可能快地读取整个枚举,即使输出流无法赶上。在我的情况下,可枚举是一个文件流,所以这可能会导致使用大量内存。因此,如果缓冲区达到某个阈值,我希望阻止读取。

4

2 回答 2

3

我已经通过在生产者和消费者上使用信号量来解决这个问题,如下所示。但是,我不确定这是否被认为是一个好的解决方案(就 Rx 合同、编程风格等而言)。

var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);

// add to producer (before subscribe)
.Do(_ => semaphore.Wait());

// add to consumer (before subscribe)
.Do(_ => semaphore.Release()))

CancelationToken将 a 传递给调用Wait()并确保在流异常停止时将其取消可能是个好主意?

于 2012-06-23T10:46:45.243 回答
2

我认为您的解决方案非常合理。最大的问题(对上一个问题有一些背景)是您的解决方案的“内部”目前无处不在。只需确保在正确编码时清理以下内容:

  • 将所有内容包装到一个公开单个方法的类中:IDisposable Subscribe(<index>, Action)或者IObservable<element> ToObservable(<index>)). 返回的订阅或返回的 observable 都已经完成了所有的“工作”,即添加的Do操作等等。它下面有一个字典或列表的事实应该与用户完全无关,否则对您的代码的任何更改都需要在整个地方进行更改。

  • ACancelationToken是个好主意,请确保在OnCompleted或上取消它OnError,您可以使用重载来做到这一点Do

于 2012-06-23T22:49:43.700 回答