我是 RX 新手。
我想遍历一个 IEnumerable 并发布到在各自线程中处理数据的多个 DataHandlers。
下面是我的示例程序。发布工作并创建了一个新线程,但 3 个 RowHandlers 都在 1 个线程中运行。我需要3个线程。实现这一点的最佳方法是什么?
class Program
{
public class MyDataGenerator
{
public IEnumerable<int> myData()
{
//Heavy lifting....Don't want to process more than once.
yield return 1;
yield return 2;
yield return 3;
yield return 4;
yield return 5;
yield return 6;
}
}
static void Main(string[] args)
{
MyDataGenerator h = new MyDataGenerator();
Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString());
//
var shared = h.myData().ToObservable().Publish();
///////////////////////////////
// Row Handling Requirements
//
// 1. Single Scan of IEnumerable.
// 2. Row handlers process data in their own threads.
// 3. OK if scanning thread blocks while data is processed
//
//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.Subscribe(i => rn1.processID(i));
MyRowHandler rn2 = new MyRowHandler();
rn2.ido = shared.Subscribe(i => rn2.processID(i));
MyRowHandler rn3 = new MyRowHandler();
rn3.ido = shared.Subscribe(i => rn3.processID(i));
//
shared.Connect();
}
public class MyRowHandler
{
public IDisposable ido = null;
public void processID(int i)
{
var o = Observable.Start(() =>
{
Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
Thread.Sleep(30);
Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString());
}
);
o.First();
}
}
}
发现:
从 Rx 获得的编码速度和代码质量的提高是以牺牲性能为代价的。毫无疑问,任务/代表的速度要快几倍。这意味着需要了解 Rx 的最重要的事情是何时使用 Rx。以下是一份概要指南草案。对于大容量,我可以看到 Rx 在分块、组合和其他许多流多处理程序模型中的用途;但是,基本 Async 不应该使用 rx。
我会发布带有矩阵指南的图像,但该网站不允许我发布图像