1

我是响应式扩展的新手。

我有一个基于房间的 MMOFPS 游戏服务器,有很多房间,只有一个监听插座。我创建了一个cold observable,它表示从网络接收到的消息,并希望将其转换为hot 以便在多个房间之间共享,以便每个房间都可以过滤和处理它们的相关消息。我的方法正确吗?

另一个问题是,在将可观察到的冷转换为热后,我意识到 SubscribeOn 失去了效果。例如:

var observable = Observable.Return(1).Publish(); 
observable 
    .Where( 
        x => 
            { 
                Console.WriteLine("Filter on {0}", Thread.CurrentThread.ManagedThreadId); 
                return true; 
            }) 
    .SubscribeOn(Scheduler.NewThread) 
    .ObserveOn(Scheduler.NewThread) 
    .Subscribe(x => Console.WriteLine("Received on {0}", Thread.CurrentThread.ManagedThreadId)); 
observable.Connect(); 
Console.WriteLine("End {0}", Thread.CurrentThread.ManagedThreadId); 
Console.ReadLine(); 

结果:
12 收到
10 10 过滤 10

没有发布:

结果:11 结束 10 筛选 12 接收

但是当我使用 Publish.RefCount 自动连接时,它按预期工作。

我错过了什么吗?...

4

1 回答 1

0

应用运算符的顺序很重要。如果你想共享 SubscribeOn 的订阅副作用,那么你必须在 Publish 之前应用它。例如:

var observable = Observable
    .Return(1)
    .SubscribeOn(Scheduler.NewThread)
    .Publish();

欲了解更多信息: http ://social.msdn.microsoft.com/Forums/pl-PL/rx/thread/29de2890-8303-404d-a6f8-f0fe0f716a86

于 2012-09-20T07:40:41.897 回答