这个问题到处都是“取决于”,特别是当您谈论性能和效率但提供了一个有点做作的例子时。也就是说,与真实文件相比,您的示例文件非常简单。但是,我将尝试提供一些关于它有用的建议。
这是一种将流转换为Enumerable<char>
. 流将应用缓冲,这将一次发送一个结果。这可以提高效率(发回数据块),但在某些时候您需要一次处理一个,它也可能在这里。不要过早地优化。
IEnumerable<char> ReadBytes(Stream stream)
{
using (StreamReader reader = new StreamReader(stream))
{
while (!reader.EndOfStream)
yield return (char)reader.Read();
}
}
现在,假设这是“输出”可观察对象的处理代码。首先,我设置了输出 observables,然后在适当的时候订阅它们。请注意,我在这里使用了一个数组,所以我的输出可观察索引是数组索引。如果流索引不能转换为从零开始的索引,也可以使用字典。
var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();
outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));
注意Subject<char>
发送我的元素的使用。这取决于元素的类型,但char
在给出的示例中有效。另请注意,我延迟元素只是为了证明一切正常。它们现在是独立的流,你可以对它们做任何你想做的事情。
好的,给定一个文件流:
var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);
我现在可以订阅并使用模索引发送到正确的输出流:
ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
这里有可能更有效的方法,具体取决于您如何计算目标流,但想法保持不变。
输入文件只包含一行:ABCABCABCABCABCABC
运行程序的输出是:
he: C
he: C
he: C
he: C
he: C
he: C
一秒钟后:
ho: B
ho: B
ho: B
ho: B
ho: B
ho: B
然后又一秒钟:
hi: A
hi: A
hi: A
hi: A
hi: A
hi: A