编辑:
哈哈-适合我阅读太快-您要问的要简单得多...也就是说,我认为以下内容很重要,所以我要离开了...所以,您的问题-尝试添加此行:
var messagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default)
.Multicast(subject)
// Here: connectable observables are a PITA...
.RefCount();
结束编辑:
嗯...如何描述Multicast
...我想我们举个例子:
假设你有这样的东西——你认为它会产生什么?
int delay = 100;
var source = Observable.Interval(TimeSpan.FromMilliseconds(delay));
var publishingFrontend = new Subject<string>();
// Here's "raw"
var rawStream = source;
using(rawStream.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(rawStream.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
由于您订阅的是原始流,因此新订阅者基本上是从头开始的:
(如果您重新运行,这将不会 100% 匹配,因为我采取了笨拙的方式Thread.Sleep
,但应该接近)
0
1
2
Inner: 0
3
Inner: 1
4
5
6
7
8
9
嗯……所以如果我们想“在中游”,我们使用以下Publish().RefCount()
模式:
var singleSource = source.Publish().RefCount();
using(singleSource.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(singleSource.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
这会产生类似的东西:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
假设我们没有Publish()
操作符——我们如何模拟它?
Console.WriteLine("Simulated Publish:");
// use a subject to proxy values...
var innerSubject = new Subject<long>();
// wire up the source to "write to" the subject
var innerSub = source.Subscribe(innerSubject);
var simulatedSingleSource = Observable.Create<long>(obs =>
{
// return subscriptions to the "proxied" subject
var publishPoint = innerSubject.Subscribe(obs);
return publishPoint;
});
运行这个,我们得到:
Simulated Publish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
哇!
但还有另一种方法...
Console.WriteLine("MulticastPublish:");
var multicastPublish = source.Multicast(new Subject<long>()).RefCount();
using(multicastPublish.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(multicastPublish.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
输出:
MulticastPublish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
编辑:
事实上,所有的ConnectableObservable
生成扩展都依赖于Multicast
/Subject
配对:
Publish() => Multicast(new Subject<T>)
Replay() => Multicast(new ReplaySubject<T>)
PublishLast() => Multicast(new AsyncSubject<T>)