我是响应式扩展的新手。
我有一个基于房间的 MMOFPS 游戏服务器,有很多房间,只有一个监听插座。我创建了一个cold observable,它表示从网络接收到的消息,并希望将其转换为hot 以便在多个房间之间共享,以便每个房间都可以过滤和处理它们的相关消息。我的方法正确吗?
另一个问题是,在将可观察到的冷转换为热后,我意识到 SubscribeOn 失去了效果。例如:
var observable = Observable.Return(1).Publish();
observable
.Where(
x =>
{
Console.WriteLine("Filter on {0}", Thread.CurrentThread.ManagedThreadId);
return true;
})
.SubscribeOn(Scheduler.NewThread)
.ObserveOn(Scheduler.NewThread)
.Subscribe(x => Console.WriteLine("Received on {0}", Thread.CurrentThread.ManagedThreadId));
observable.Connect();
Console.WriteLine("End {0}", Thread.CurrentThread.ManagedThreadId);
Console.ReadLine();
结果:
12 收到
10 10 过滤 10
没有发布:
结果:11 结束 10 筛选 12 接收
但是当我使用 Publish.RefCount 自动连接时,它按预期工作。
我错过了什么吗?...