2

我是响应式扩展的新手,我想用它(在 c# 中)来读取一个包含多个交错流的文件。基本上该文件的格式为ABCDABCDABCD.... 我更愿意按顺序读取文件并分离流(即AAA..BBB..等)并并行处理每个流,为每个流使用单独的线程。

必须有某种形式的缓冲以确保每个流可以尽可能保持忙碌(当然在限制范围内)。并非所有流都必须同时开始,在这种情况下,必须为延迟的流跳过许多元素。在这种情况下,缓冲可能会弥合差距。

文件中的元素很小(4 个字节),所以很健谈。因此,我也在寻找一种有效处理这个问题的方法。

我首先创建了一个可枚举的文件来读取文件。这可以提供一个包含流 ID 的结构,或者可以根据顺序(元素数模数流)分离流。后者可能更有效。

4

2 回答 2

3

这个问题到处都是“取决于”,特别是当您谈论性能和效率但提供了一个有点做作的例子时。也就是说,与真实文件相比,您的示例文件非常简单。但是,我将尝试提供一些关于它有用的建议。

这是一种将流转换为Enumerable<char>. 流将应用缓冲,这将一次发送一个结果。这可以提高效率(发回数据块),但在某些时候您需要一次处理一个,它也可能在这里。不要过早地优化。

IEnumerable<char> ReadBytes(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return (char)reader.Read();
    }
}

现在,假设这是“输出”可观察对象的处理代码。首先,我设置了输出 observables,然后在适当的时候订阅它们。请注意,我在这里使用了一个数组,所以我的输出可观察索引是数组索引。如果流索引不能转换为从零开始的索引,也可以使用字典。

var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));

注意Subject<char>发送我的元素的使用。这取决于元素的类型,但char在给出的示例中有效。另请注意,我延迟元素只是为了证明一切正常。它们现在是独立的流,你可以对它们做任何你想做的事情。

好的,给定一个文件流:

var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);

我现在可以订阅并使用模索引发送到正确的输出流:

ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

这里有可能更有效的方法,具体取决于您如何计算目标流,但想法保持不变。

输入文件只包含一行:ABCABCABCABCABCABC

运行程序的输出是:

he: C
he: C
he: C
he: C
he: C
he: C

一秒钟后:

ho: B
ho: B
ho: B
ho: B
ho: B
ho: B

然后又一秒钟:

hi: A
hi: A
hi: A
hi: A
hi: A
hi: A
于 2012-06-21T22:04:29.767 回答
1

以下是我的解决方案,它基于 yamen 的回答。它似乎工作正常,这意味着顺序交错输入被分成多个并行处理(多线程)的顺序流。

但是,我不确定这是否是一个正确的实现(在编程风格、rx 合同等方面)。

const int MAX_BUFFERED_ELEMENTS = 1024;

// number of streams in the file
var numberOfStreams = 8;

// semaphore to limit buffered elements
var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);
var cts = new CancellationTokenSource(); // should be used to cancel (left out of this sample)

// create subjects that are the base of each output stream
var subjects = Enumerable.Repeat(0, numberOfStreams).Select(_ => new Subject<ElementType>()).ToArray();

// create the source stream (reader is IEnumerable<ElementType>)
var observable = reader.ToObservable(Scheduler.ThreadPool).Publish();

// forward elements from source to the output subjects
int stream = 0;
observable.Subscribe(x => { 
    semaphores.Wait(cts.Token);   // wait if buffer is full
    _subjects[stream].OnNext(x);  // forward to output stream
    if (++stream >= numberOfStreams) stream = 0; }); // stream = stream++ % numberOfStreams

// build output streams
subjects.Select(
    (s,i) => s.ObserveOn(Scheduler.ThreadPool) // process on separate threads
    .Do(_ => semaphore.Release())              // signal that element is consumed
    .Subscribe(x => Console.WriteLine("stream: {0}\t element: {1}", i, x)) // debug 'processing'
    );

// start processing!
observable.Connect();
于 2012-06-23T02:59:08.590 回答