1

我是 RX 新手。

我想遍历一个 IEnumerable 并发布到在各自线程中处理数据的多个 DataHandlers。

下面是我的示例程序。发布工作并创建了一个新线程,但 3 个 RowHandlers 都在 1 个线程中运行。我需要3个线程。实现这一点的最佳方法是什么?

class Program
{

    public class MyDataGenerator
    {
        public IEnumerable<int> myData()
        {
            //Heavy lifting....Don't want to process more than once.
            yield return 1;
            yield return 2;
            yield return 3;
            yield return 4;
            yield return 5;
            yield return 6;
        }
    }

    static void Main(string[] args)
    {
        MyDataGenerator h = new MyDataGenerator();
        Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString());
        //
        var shared = h.myData().ToObservable().Publish();
        ///////////////////////////////
        //  Row Handling Requirements
        //
        //      1. Single Scan of IEnumerable. 
        //      2. Row handlers process data in their own threads. 
        //      3. OK if scanning thread blocks while data is processed
        //

        //Create the RowHandlers
        MyRowHandler rn1 = new MyRowHandler();
        rn1.ido = shared.Subscribe(i => rn1.processID(i));
        MyRowHandler rn2 = new MyRowHandler();
        rn2.ido = shared.Subscribe(i => rn2.processID(i));
        MyRowHandler rn3 = new MyRowHandler();
        rn3.ido = shared.Subscribe(i => rn3.processID(i));
        //
        shared.Connect();
    }

    public class MyRowHandler
    {
        public IDisposable ido = null;
        public void processID(int i)
        {
            var o = Observable.Start(() =>
                                        {
                                            Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
                                            Thread.Sleep(30);
                                            Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString());
                                        }
                                    );
            o.First(); 
        }
    }
}

发现:

从 Rx 获得的编码速度和代码质量的提高是以牺牲性能为代价的。毫无疑问,任务/代表的速度要快几倍。这意味着需要了解 Rx 的最重要的事情是何时使用 Rx。以下是一份概要指南草案。对于大容量,我可以看到 Rx 在分块、组合和其他许多流多处理程序模型中的用途;但是,基本 Async 不应该使用 rx。

我会发布带有矩阵指南的图像,但该网站不允许我发布图像

4

1 回答 1

3

如果我正确理解了您的排序要求并且您想要三个并行运行的扫描,您可以在 TaskPool 上观察并从那里订阅;

...

//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.ObserveOn(Scheduler.TaskPool).Subscribe(i => rn1.processID(i));

...

请注意,由于您随后异步运行并且您的主线程不会等待扫描完成,因此您的程序将立即终止,除非您将 aConsole.ReadKey()放在程序末尾。

编辑:关于“一路”运行同一个线程,你为此安排了一些奇怪的事情。如果将 observable 放到 rowhandler 中,则可以使用 Scheduler.NewThread 并获得不错的结果;

...

var rowHandler1 = new MyRowHandler();
rowHandler1.ido = shared.ObserveOn(Scheduler.NewThread).Subscribe(rowHandler1.ProcessID);

...

public void ProcessID(int i)
{
    Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
    Thread.Sleep(30);
    Console.WriteLine("Done Thread ID" + Thread.CurrentThread.ManagedThreadId.ToString(CultureInfo.InvariantCulture));
}

这将为每个订阅提供自己的线程,并坚持下去。

于 2012-04-14T06:35:44.613 回答