2

首先,我的目标是 .Net Core 3.1 和 C#8。

我想要这样的东西。

public static async Task<MyDataObj> GetData()
{
  var dataObj = new MyDataObj();
  var args = ArgsHelperFunction(...);
  
  await foreach(string result in RunProcessAsync(args))
  {
     // Process result and store it in dataObj
  }

  return dataObj;
}

private static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
  await using (var myProcess = new Process())
  {
    myProcess.StartInfo.FileName = @"path\to\file.exe"
    myProcess.StartInfo.Arguments = args;
    myProcess.StartInfo.UseShellExecute = false;
    myProcess.StartInfo.RedirectStandardError = true;
    myProcess.StartInfo.CreateNoWindow = true;

    myProcess.ErrorDataReceived += (s, e) =>
    {
      yield return e.Data;
    }

    myProcess.Start();
    myProcess.BeginErrorReadLine();
    process.WaitforExit();
  }
}

当我尝试此设置时,我收到错误await foreach(string result in RunProcessAsync(args))

CS8417 'Process':异步 using 语句中使用的类型必须隐式转换为 'System.IAsyncDisposable' 或实现合适的 'DisposeAsync' 方法。

这个错误来自yield return e.Data;

CS1621 不能在匿名方法或 lambda 表达式中使用 yield 语句

目标是这样的。我有一个执行一些操作并将信息写入错误输出流的 exe(不确定这是否是它的真实名称)。我想在写入时接受这些写入,解析它们以获得我想要的信息并将它们存储在一个对象中以供以后使用。

我是一个非常新手的编码器,对异步编码非常陌生。我测试了RunProcessAsync但以同步方式的功能;它被调用的地方,只是将所有原始数据写入输出窗口,而不将任何数据返回给调用方法。那工作得很好。另外,我得到了一个使用 的测试 asyc 流IAsyncEnumerable,但它只是使用Task.Delay并返回了一些整数。现在我正在尝试将这些东西结合起来,而我缺乏经验正在妨碍我。

感谢大家提供的任何帮助以及帮助提高 C# 的技能和知识。

4

2 回答 2

1

如果没有最小的、可重现的示例,就不可能完全解决您的问题。但我们可以处理你提出的两个具体问题。

首先,如果您的对象(例如Process)不支持IAsyncDisposable,那么就不要使用它。请改用同步using语句。

yield return方法而言,如果您花一点时间,您可能会发现您尝试编写的内容没有任何意义。事件处理程序是一个完全不同的方法,如何能够使当前方法产生一个新值?当事件发生时,您需要事件处理程序向当前方法发出信号。您可以通过多种方式执行此操作,但这SemaphoreSlim是更直接的方式之一。

把它们放在一起,你可能会得到这样的东西:

private static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
  using (var myProcess = new Process())
  {
    myProcess.StartInfo.FileName = @"path\to\file.exe";
    myProcess.StartInfo.Arguments = args;
    myProcess.StartInfo.UseShellExecute = false;
    myProcess.StartInfo.RedirectStandardError = true;
    myProcess.StartInfo.CreateNoWindow = true;

    ConcurrentQueue<string> dataQueue = new ConcurrentQueue<string>();
    SemaphoreSlim dataSemaphore = new SemaphoreSlim(0);

    myProcess.ErrorDataReceived += (s, e) =>
    {
      dataQueue.Enqueue(e.Data);
      dataSemaphore.Release();
    }

    myProcess.Start();
    myProcess.BeginErrorReadLine();

    while (true)
    {
      await dataSemaphore.WaitAsync();

      // Only one consumer, so this will always succeed
      dataQueue.TryDequeue(out string data);

      if (data == null) break;

      yield return data;
    }
  }
}

由于您没有提供实际的 MCVE,因此我无法尝试从头开始重建您的场景。所以上面没有编译,没关系测试。但它应该显示要点。

也就是说,您需要保持迭代器方法异步(这意味着您不能阻止对 的调用WaitForExit()),并且您需要以某种方式将ErrorDataReceived事件处理程序接收到的数据移回迭代器方法。在上面,我将线程安全队列对象与信号量结合使用。

Release()每次接收到一行数据时,事件处理程序中的信号量计数都会增加(通过),然后迭代器方法通过减少信号量计数(通过WaitAsync())并返回接收到的行来消耗这些数据。

这里还有很多其他机制可以用于生产者/消费者方面。这里有一个广受好评的问答,它讨论了异步兼容机制,包括BlockingCollection<T>支持异步操作的自定义版本,并提到了TPL Dataflow 中BufferBlock<T>

这是一个使用的示例BufferBlock<T>(其语义非常相似,BlockingCollection<T>但包括对消费代码的异步处理):

static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
    using (var process = new Process())
    {
        myProcess.StartInfo.FileName = @"path\to\file.exe";
        process.StartInfo.Arguments = args;
        process.StartInfo.UseShellExecute = false;
        process.StartInfo.RedirectStandardError = true;
        process.StartInfo.CreateNoWindow = true;

        BufferBlock<string> dataBuffer = new BufferBlock<string>();

        process.ErrorDataReceived += (s, e) =>
        {
            if (e.Data != null)
            {
                dataBuffer.Post(e.Data);
            }
            else
            {
                dataBuffer.Complete();
            }
        };

        process.Start();
        process.BeginErrorReadLine();

        while (await dataBuffer.OutputAvailableAsync())
        {
            yield return dataBuffer.Receive();
        }
    }
}

于 2020-09-18T06:18:46.303 回答
0

像这样的东西?

using CliWrap;
using CliWrap.EventStream;

var cmd = Cli.Wrap("foo").WithArguments("bar");

await foreach (var cmdEvent in cmd.ListenAsync())
{
    switch (cmdEvent)
    {
        case StartedCommandEvent started:
            _output.WriteLine($"Process started; ID: {started.ProcessId}");
            break;
        case StandardOutputCommandEvent stdOut:
            _output.WriteLine($"Out> {stdOut.Text}");
            break;
        case StandardErrorCommandEvent stdErr:
            _output.WriteLine($"Err> {stdErr.Text}");
            break;
        case ExitedCommandEvent exited:
            _output.WriteLine($"Process exited; Code: {exited.ExitCode}");
            break;
    }
}

这在CliWrap中可用。

于 2020-10-09T14:13:51.943 回答