0

I have a problem getting my observers working properly when on specific threads.

Subject<bool> subjEvent = new Subject<bool>();
Subject<int> subjValue = new Subject<int>();

IScheduler sched = new EventLoopScheduler(ts => new Thread(ts));

subjEvent.ObserveOn(sched).Subscribe(
    r =>
        {
            if(r)
            {
                Console.WriteLine("Connected On: \t{0}", Thread.CurrentThread.ManagedThreadId);

                subjValue.ObserveOn(sched).Subscribe(
                    x => Console.WriteLine("Recieved On: \t{0}", Thread.CurrentThread.ManagedThreadId));
            }else{
                Console.WriteLine("Disconnect On: \t{0}", Thread.CurrentThread.ManagedThreadId);
            }
        }
    );

subjEvent.OnNext(true);
for(int i=0; i< 10; i++)
{
    subjValue.OnNext(i);
}
subjEvent.OnNext(false);
subjValue.OnCompleted();
subjEvent.OnCompleted();

The idea is to subscribe to something when it becomes available and unsubscribe/resubscribe with events thereafter. Now as I require to observe on a specific (read same) thread, as well as ensure correct ordering, I have an EventLoopScheduler.

The problem is I get nothing from the value subscription.

Now, if I add a Thread.Sleep(10) to the generation loop (after OnNext) it works perfectly fine. So I'm kinda perplexed as to what I'm doing wrong and would be very grateful for help/advise.

4

2 回答 2

3

Although this doesn't help answer your question directly - I think Asti has done a fine job of doing that - I thought I'd post this code to help you write your queries.

You should try very hard not to subscribe to your observables until you absolutely have to. And when you do the code in your OnNext handler needs to be as small as possible.

Try this version of your queries:

var query =
    from b in subjEvent.Do(x =>
        Console.WriteLine("{0}onnected On: \t{1}",
            x ? "C" : "Disc",
            Thread.CurrentThread.ManagedThreadId))
    select b ? subjValue : Observable.Empty<int>();

query.Switch().Subscribe(x =>
    Console.WriteLine("Recieved On: \t{0}",
        Thread.CurrentThread.ManagedThreadId));

Barring the issue with the event loop scheduler this code does the same as yours, but it only has one subscribe and it does a minimal amout of work in that subscription. I hope this helps.

于 2012-09-10T00:23:57.693 回答
2

This is although non-deterministic, expected behaviour. The problem is that the second subscription is being set up in the EventLoopScheduler. It takes some time to set up the subscription to be observed by the scheduler. But notifications for it are already underway in the main thread. By the time the subscription is set up, the loop will have finished executing.

To see this happening, remove the ObserveOn from your event subject. This will force the subscription to be set up before moving on notifying the second subject with values. You can also allow some time to elapse after the OnNext(true) to allow it to wire up the subscription.

于 2012-09-09T08:18:23.190 回答