24

因此,在 C# 4.0 的悲惨日子里,我创建了以下“WorkflowExecutor”类,该类通过侵入 IEnumerable 的“yield return”延续以等待 observables 来允许 GUI 线程中的异步工作流。因此,以下代码将在 button1Click 处启动一个简单的工作流程,更新文本,等待您单击 button2,并在 1 秒后循环。

public sealed partial class Form1 : Form {
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();

    public Form1() {
        InitializeComponent();
    }

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() {
        Text = "Initializing";
        var scheduler = new ControlScheduler(this);
        while (true) {
            yield return scheduler.WaitTimer(1000);
            Text = "Waiting for Click";
            yield return _button2Subject;
            Text = "Click Detected!";
            yield return scheduler.WaitTimer(1000);
            Text = "Restarting";
        }
    }

    void button1_Click(object sender, EventArgs e) {
        _workflowExecutor.Run(CreateAsyncHandler());
    }

    void button2_Click(object sender, EventArgs e) {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e) {
        _workflowExecutor.Stop();
    }
}

public static class TimerHelper {
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {
        return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);
    }
}

public sealed class WorkflowExecutor {
    IEnumerator<IObservable<Unit>> _observables;
    IDisposable _subscription;

    public void Run(IEnumerable<IObservable<Unit>> actions) {
        _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();
        Continue();
    }

    void Continue() {
        if (_subscription != null) {
            _subscription.Dispose();
        }
        if (_observables.MoveNext()) {
            _subscription = _observables.Current.Subscribe(_ => Continue());
        }
    }

    public void Stop() {
        Run(null);
    }
}

这个想法的聪明部分,使用“yield”延续来做异步工作,取自 Daniel Earwicker 的 AsyncIOPipe 想法:http ://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield- return-of-lambdas/,然后我在其上添加了响应式框架。

现在我在使用 C# 5.0 中的异步功能重写它时遇到了麻烦,但它似乎应该是简单的事情。当我将 observables 转换为任务时,它们只运行一次,而 while 循环第二次崩溃。任何帮助解决这个问题都会很棒。

所有这些说/问,异步/等待机制给了我什么 WorkflowExecutor 没有?有什么我可以用 async/await 做的,而我不能用 WorkflowExecutor 做(给定类似数量的代码)?

4

2 回答 2

37

正如 James 所提到的,您可以等待从 Rx v2.0 Beta 开始的 IObservable<T> 序列。行为是返回最后一个元素(在 OnCompleted 之前),或者抛出观察到的 OnError。如果序列不包含任何元素,您将获得 InvalidOperationException。

请注意,使用它,您可以获得所有其他所需的行为:

  • 通过等待 xs.FirstAsync() 获取第一个元素
  • 通过等待 xs.SingleAsync() 确保只有一个值
  • 当您对空序列没问题时,等待 xs.DefaultIfEmpty()
  • 要获取所有元素,请等待 xs.ToArray() 或等待 xs.ToList()

您可以做更多花哨的事情,例如计算聚合结果,但使用 Do 和 Scan 观察中间值:

var xs = Observable.Range(0, 10, Scheduler.Default);

var res = xs.Scan((x, y) => x + y)
            .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });

Console.WriteLine("Done! The sum is {0}", await res);
于 2012-05-24T17:08:21.807 回答
31

正如您所注意到的,与 Observable 的“事件流”相反,Task 是一种一次性使用的东西。一个很好的思考方式(恕我直言)是Rx 团队关于 2.0 Beta 的帖子上的 2x2 图表:

任务与可观察的 2x2 图表

根据情况(一次性与“事件流”),保持 Observable 可能更有意义。

如果您可以升级到 Reactive 2.0 Beta,那么您可以使用它来“等待”observables。例如,我自己对代码的“异步/等待”(近似)版本的尝试是:

public sealed partial class Form1 : Form
{
    readonly Subject<Unit> _button2Subject = new Subject<Unit>();

    private bool shouldRun = false;

    public Form1()
    {
        InitializeComponent();
    }

    async Task CreateAsyncHandler()
    {
        Text = "Initializing";
        while (shouldRun)
        {
            await Task.Delay(1000);
            Text = "Waiting for Click";
            await _button2Subject.FirstAsync();
            Text = "Click Detected!";
            await Task.Delay(1000);
            Text = "Restarting";
        }
    }

    async void button1_Click(object sender, EventArgs e)
    {
        shouldRun = true;
        await CreateAsyncHandler();
    }

    void button2_Click(object sender, EventArgs e)
    {
        _button2Subject.OnNext(Unit.Default);
    }

    void button3_Click(object sender, EventArgs e)
    {
        shouldRun = false;
    }
}
于 2012-04-24T02:19:32.907 回答