下面是我尝试将 System.Diagnostics.Process 转换为 IConnectableObservable。这个解决方案有一个问题:我想连续收听标准输出和错误,并使用事件 Process.Exited 作为 OnCompleted 的触发器。不幸的是,我发现 Process.Exited在输出缓冲区为空之前引发。这意味着如果没有我使用线程睡眠的丑陋解决方法,我可以重现不通过 OnNext 语句提供输出的情况。
Q1:您看到此问题的任何解决方法吗?
Q2:关于 System.Reactive:在我的解决方案中我可以做得更好吗?
问候,
马库斯
public static class RxProcessUtilities
{
    /// <summary>
    /// Creates a connectable observable for a process.
    /// </summary>
    /// <remarks>Must be a connectable observable in order to hinder multiple 
    /// subscriptions to call the process multiple times.</remarks>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IConnectableObservable<string> CreateConnectableObservableProcess
        (string filename, string arguments, IObservable<string> input = null)
    {
        var observable = Observable.Using(() =>
            {
                Process process = new Process();
                // process configuration
                process.StartInfo.FileName = filename;
                process.StartInfo.Arguments = arguments;
                process.StartInfo.CreateNoWindow = true;
                process.StartInfo.UseShellExecute = false;
                process.EnableRaisingEvents = true;
                process.StartInfo.RedirectStandardError = true;
                process.StartInfo.RedirectStandardOutput = true;
                if (null != input)
                {
                    process.StartInfo.RedirectStandardInput = true;
                    input.Subscribe(s =>
                        {
                            if (!process.HasExited)
                            {
                                process.StandardInput.Write(s);
                            }
                        });
                }
                return process;
            },
            process =>
            {
                return Observable.Create<string>(
                (IObserver<string> observer) =>
                {
                    // listen to stdout and stderr
                    var stdOut = RxProcessUtilities.CreateStandardOutputObservable(process);
                    var stdErr = RxProcessUtilities.CreateStandardErrorObservable(process);
                    var stdOutSubscription = stdOut.Subscribe(observer);
                    var stdErrSubscription = stdErr.Subscribe(observer);
                    var processExited = Observable.FromEventPattern
                    (h => process.Exited += h, h => process.Exited -= h);
                    var processError = processExited.Subscribe(args =>
                    {
                        // Here is my problem: process sends exited event *before* all 
                        // *DataReceived events have been raised
                        // My ugly workaround for process exit before stdout and stderr buffers are empty.
                        Thread.Sleep(2000);
                        // Also: AFAICS we cannot read synchronously what is left in the buffer, 
                        // since we started asynchronously. This will throw:
                        // string restOfStdOut = process.StandardOutput.ReadToEnd();
                        // string restOfStdErr = process.StandardError.ReadToEnd();
                        if (process.ExitCode != 0)
                        {
                            observer.OnError(new Exception
                                (String.Format("Process '{0}' terminated with error code {1}",
                                 process.StartInfo.FileName, process.ExitCode)));
                        }
                        else
                        {
                            observer.OnCompleted();
                        }
                    });
                    process.Start();
                    process.BeginOutputReadLine();
                    process.BeginErrorReadLine();
                    return new CompositeDisposable
                        (stdOutSubscription,
                         stdErrSubscription,
                         processError);
                });
            });
        return observable.Publish();
    }
    /// <summary>
    /// Creates an IObservable<string> for the standard error of a process.
    /// </summary>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IObservable<string> CreateStandardErrorObservable(Process process)
    {
        // var processExited = Observable.FromEventPattern
        //    (h => process.Exited += h, h => process.Exited -= h);
        var receivedStdErr =
            Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>
                (h => process.ErrorDataReceived += h,
                 h => process.ErrorDataReceived -= h)
            //.TakeUntil(processExited) 
            // cannot be used here, since process exited event might be raised 
            // before all stderr and stdout events occurred.
            .Select(e => e.EventArgs.Data);
        return Observable.Create<string>(observer =>
        {
            var cancel = Disposable.Create(process.CancelErrorRead);
            return new CompositeDisposable(cancel, receivedStdErr.Subscribe(observer));
        });
    }
    /// <summary>
    /// Creates an IObservable<string> for the standard output of a process.
    /// </summary>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IObservable<string> CreateStandardOutputObservable(Process process)
    {
        var receivedStdOut =
            Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>
            (h => process.OutputDataReceived += h,
             h => process.OutputDataReceived -= h)
            .Select(e => e.EventArgs.Data);
        return Observable.Create<string>(observer =>
        {
            var cancel = Disposable.Create(process.CancelOutputRead);
            return new CompositeDisposable(cancel, receivedStdOut.Subscribe(observer));
        });
    }
}