2

我想创建一个实用方法,为仅在订阅时调用的操作创建 IObservable 并且!它遵循 SubscribeOn(...) 指令。这是我的实现,它基于我可以从http://www.introtorx.com和其他资源中提取的内容,但在一种特定情况下它失败了:

    /// <summary>
    /// Makes an observable out of an action. Only at subscription the task will be executed. 
    /// </summary>
    /// <param name="action">The action.</param>
    /// <returns></returns>
    public static IObservable<Unit> MakeObservable_2(Action action)
    {
        return Observable.Create<Unit>(
            observer =>
            {
                return System.Reactive.Concurrency.CurrentThreadScheduler.Instance.Schedule(
                    () =>
                    {
                        try
                        {
                            action();
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();
                        }
                        catch (Exception ex)
                        {
                            observer.OnError(ex);
                        }
                    });
            });
    }

我希望 CurrrentThreadScheduler 的使用会导致 SubscribeOn() 中给出的调度程序的使用。此实现适用于 .SubscribeOn(TaskPoolScheduler.Default),但不适用于 .SubscribeOn(Dispatcher.CurrentDispatcher)。您能否更改上述实现以使下面的所有单元测试都通过?

    [Test]
    public void RxActionUtilities_MakeObservableFromAction_WorksAsExpected()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = -42;
        int threadIdOfSubscriptionContect = -43;
        bool subscriptionWasCalled = false;

        Action action = () =>
            {
                threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("This is an action on thread " + threadIdOfAction);
            };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        // The next line is the one I want to have working, but the subscription is never executed
        observable.SubscribeOn(Dispatcher.CurrentDispatcher).Subscribe(
            //observable.Subscribe( // would pass
            (unit) =>
            {
                Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                subscriptionWasCalled = true;
            },
            (ex) => evt.Set(), () => evt.Set());

        Console.WriteLine("After subscription");

        evt.WaitOne();

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);

        Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }

    [Test]
    // This test passes with the current implementation
    public void RxActionUtilities_MakeObservableFromActionSubscribeOnDifferentThread_WorksAsExpected()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = 42;
        int threadIdOfSubscriptionContect = 43;
        bool subscriptionWasCalled = false;

        Action action = () =>
        {
            threadIdOfAction = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine("This is an action on thread " + threadIdOfAction);
        };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        // The next line is the one I want to have working, but the subscription is never executed
        observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
            (unit) =>
            {
                Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                subscriptionWasCalled = true;
            },
            (ex) => evt.Set(), () => evt.Set());

        evt.WaitOne();

        Console.WriteLine("After subscription");

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);
        Assert.AreNotEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }


    [Test]
    public void RxActionUtilities_MakeObservableFromAction_IsCancellable()
    {
        ManualResetEvent evt = new ManualResetEvent(false);

        // Timeout of this test if sth. goes wrong below
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(5000);
            Console.WriteLine("Test timed out!");
            evt.Set();
        });

        int threadIdOfAction = -42;
        int threadIdOfSubscriptionContect = -43;
        bool subscriptionWasCalled = false;
        bool actionTerminated = false;

        Action action = () =>
        {
            threadIdOfAction = Thread.CurrentThread.ManagedThreadId;

            for (int i = 0; i < 10; ++i)
            {
                Console.WriteLine("Some action #" + i);

                Thread.Sleep(200);
            }

            actionTerminated = true;
            evt.Set();
        };

        var observable = RxActionUtilities.MakeObservable_2(action);

        threadIdOfSubscriptionContect = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("Before subscription on thread " + threadIdOfSubscriptionContect);

        var subscription =
            observable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(
                (unit) =>
                {
                    Console.WriteLine("Subscription: OnNext " + threadIdOfAction + ", " + threadIdOfSubscriptionContect);
                    subscriptionWasCalled = true;
                },
                (ex) => evt.Set(), () => evt.Set());

        Console.WriteLine("After subscription");

        Thread.Sleep(1000);
        Console.WriteLine("Killing subscription ...");
        subscription.Dispose();
        Console.WriteLine("... done.");

        evt.WaitOne();

        Assert.IsFalse(actionTerminated);

        Assert.AreNotEqual(-42, threadIdOfAction);
        Assert.AreNotEqual(-43, threadIdOfSubscriptionContect);

        Assert.AreEqual(threadIdOfAction, threadIdOfSubscriptionContect);
        Assert.That(subscriptionWasCalled);
    }

更新

作为对李的详尽回答的回应,我再次尝试并重新提出我的问题。IIUC 我们可以总结一下

  • 您无法停止已经开始的操作
  • 我完全误解了 Dispatcher.CurrentDispatcher 以及它是如何工作的:AFAICS 它不应该用作 SubscribeOn() 的参数,而只能用作 ObserveOn 的参数。
  • 我误解了 CurrentThreadScheduler

为了创建可取消的东西,我们需要一个知道取消的动作,例如通过使用Action<CancellationToken>. 这是我的下一次尝试。请告诉我你是否认为这个实现很适合 Rx 框架,或者我们是否可以再次改进它:

public static IObservable<Unit> 
    MakeObservable(Action<CancellationToken> action, IScheduler scheduler)
{
    return Observable.Create<Unit>(
        observer
        =>
        {
            // internally creates a new CancellationTokenSource
            var cancel = new CancellationDisposable(); 

            var scheduledAction = scheduler.Schedule(() =>
            {
                try
                {
                    action(cancel.Token);
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
            });

            // Cancellation before execution of action is performed 
            // by disposing scheduledAction
            // Cancellation during execution of action is performed 
            // by disposing cancel
            return new CompositeDisposable(cancel, scheduledAction);
        });
}

如果你正在这样做:我不知道如何使用TestSchedulers 来测试它:

[Test]
public void MakeObservableFromCancelableAction_CancellationTakesPlaceWithTrueThread()
{
    var scheduler = NewThreadScheduler.Default;

    Action<CancellationToken> action =
        (cancellationToken) =>
        {
            for (int i = 0; i < 10; ++i)
            {
                Console.WriteLine("Some action #" + i);

                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                Thread.Sleep(20);
                // Hoping that the disposal of the subscription stops 
                // the loop before we reach i == 4.
                Assert.Less(i, 4);
            }
        };

    var observable = RxActionUtilities.MakeObservable(action, scheduler);

    var subscription = observable.Subscribe((unit) => { });

    Thread.Sleep(60);

    subscription.Dispose();
}
4

2 回答 2

2

我认为你可以让你的代码更简单,你也可以让你的测试更简单。Rx 的美妙之处在于您应该能够取消所有任务/线程/ManualResetEvent。另外我假设您也可以只使用 NUnit 的 [Timeout] 属性而不是您的自定义代码。

无论如何... @Per 是对的, Observable.Start 是您正在寻找的。你向它传递了一个 Action 和一个 IScheduler,这看起来正是你想要的。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Start(action, scheduler)
                                    .Subscribe();

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsTrue(flag);
    subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}

但是您可能会注意到它确实有一些奇怪的行为(至少在我在这台 PC 上的 V1 中)。具体来说,Observable.Start 将立即运行 Action,而不是真正等待 observable 序列被订阅。同样由于这个原因,调用订阅,然后在操作应该执行之前处理订阅没有效果。嗯。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_dispose()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Start(action, scheduler).Subscribe();


    Assert.IsFalse(flag);
    subscription.Dispose();
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);   //FAILS. Oh no! this is true!
}
[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObStart_no_subscribe()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    Observable.Start(action, scheduler);
    //Note the lack of subscribe?!

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);//FAILS. Oh no! this is true!
}

但是,我们可以按照您的方式使用 Observable.Create。但是,您非常接近,您不需要在 Create 委托中进行任何调度。相信 Rx 会为你做这件事。

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Create<Unit>(observer =>
        {
            try
            {
                action();
                observer.OnNext(Unit.Default);
                observer.OnCompleted();
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }
            return Disposable.Empty;
        })
        .SubscribeOn(scheduler)
        .Subscribe();   //Without subscribe, the action wont run.

    Assert.IsFalse(flag);
    scheduler.AdvanceBy(1);
    Assert.IsTrue(flag);
    subscription.Dispose(); //Not required as the sequence will have completed and then auto-detached.
}

[Test]
public void Run_Action_as_IOb_on_scheduler_with_ObCreate_dispose()
{
    var scheduler = new TestScheduler();
    var flag = false;
    Action action = () => { flag = true; };

    var subscription = Observable.Create<Unit>(observer =>
    {
        try
        {
            action();
            observer.OnNext(Unit.Default);
            observer.OnCompleted();
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
        return Disposable.Empty;
    })
        .SubscribeOn(scheduler)
        .Subscribe();   //Without subscribe, the action wont run.

    Assert.IsFalse(flag);
    subscription.Dispose();
    scheduler.AdvanceBy(1);
    Assert.IsFalse(flag);   //Subscription was disposed before the scheduler was able to run, so the action did not run.
}

如果您希望能够在正在处理的动作中途取消实际动作,那么您将需要做一些比这更高级的事情。

最终实现很简单:

public static class RxActionUtilities
{
    /// <summary>
    /// Makes an observable out of an action. Only at subscription the task will be executed. 
    /// </summary>
    /// <param name="action">The action.</param>
    /// <returns></returns>
    /// <example>
    /// <code>
    /// <![CDATA[
    /// RxActionUtilities.MakeObservable_3(myAction)
    ///                  .SubscribeOn(_schedulerProvider.TaskPoolScheduler)
    ///                  .Subscribe(....);
    /// 
    /// ]]>
    /// </code>
    /// </example>
    public static IObservable<Unit> MakeObservable_3(Action action)
    {
        return Observable.Create<Unit>(observer =>
            {
                try
                {
                    action();
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
                return Disposable.Empty;
            });
    }
}

我希望这会有所帮助。

编辑:在您的单元测试中使用 Dispatcher。我认为首先你应该尝试理解它是如何工作的,然后再应用另一个层(Rx)来增加混乱。在 WPF 中编码时,Rx 给我带来的主要好处之一是通过调度程序对调度程序进行抽象。它使我可以轻松地测试 WPF 中的并发性。例如,这里的这个简单测试失败了:

[Test, Timeout(2000)]
public void DispatcherFail()
{
    var wasRun = false;
    Action MyAction = () =>
        {
            Console.WriteLine("Running...");
            wasRun = true;
            Console.WriteLine("Run.");
        };
    Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);

    Assert.IsTrue(wasRun);
}

如果你运行它,你会注意到甚至没有任何东西打印到控制台,所以我们没有竞争条件,这个动作永远不会运行。原因是调度程序没有启动它的消息循环。为了纠正这个测试,我们必须用杂乱的基础设施代码来填充它。

[Test, Timeout(2000)]
public void Testing_with_Dispatcher_BeginInvoke()
{
    var frame = new DispatcherFrame();  //1 - The Message loop
    var wasRun = false;
    Action MyAction = () =>
    {
        Console.WriteLine("Running...");
        wasRun = true;
        Console.WriteLine("Run.");
        frame.Continue = false;         //2 - Stop the message loop, else we hang forever
    };
    Dispatcher.CurrentDispatcher.BeginInvoke(MyAction);

    Dispatcher.PushFrame(frame);        //3 - Start the message loop

    Assert.IsTrue(wasRun);
}

因此,我们显然不想对所有需要 WPF 并发的测试执行此操作。尝试将 frame.Continue=false 注入我们无法控制的操作将是一场噩梦。幸运的是,IScheudler 通过它的 Schedule 方法公开了我们需要的所有东西。

Next CurrentThreadScheduler 应该被认为是一个 Trampoline,而不是一个 SynchronizationContext(我认为你认为它是这样的)。

于 2012-12-18T17:04:39.190 回答
-1

我认为 Observable.Start 是您正在寻找的。 http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.start(v=vs.103).aspx

于 2012-12-17T13:25:37.083 回答