2

我有以下函数,它以异步流的形式返回标准输出数据,该数据由运行System.Diagnostics.Process. 当前方法中的所有内容都按预期工作;我可以在一个循环中调用它,await foreach()我得到的每一行输出都是由外部 exe 生成的。

private static async IAsyncEnumerable<string> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handler back to this method
   BufferBlock<string> dataBuffer = new BufferBlock<string>();

   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         dataBuffer.Complete();
      }
      else
      {
         dataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line  
   while (await dataBuffer.OutputAvailableAsync())
      yield return dataBuffer.Receive();
}

我的问题是现在我需要它来返回标准输出和标准错误结果。我创建了这个类来保存每个流中的数据。

public class ProcessData
{
   public string Error { get; set; } = "";
   public string Output { get; set; } = "";
}

并变成ProcessAsyncStream()了这样

private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handlers back to this method
   BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
   BufferBlock<string> errorDataBuffer = new BufferBlock<string>();

   
   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         outputDataBuffer.Complete();
      }
      else
      {
         outputDataBuffer.Post(e.Data);
      }
   };

   process.ErrorDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         errorDataBuffer.Complete();
      }
      else
      {
         errorDataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line
   while (await outputDataBuffer.OutputAvailableAsync()
          || await errorDataBuffer.OutputAvailableAsync())
      yield return new ProcessData() 
      {
         Error = errorDataBuffer.Receive(), 
         Output = outputDataBuffer.Receive()
      }
}

问题是,如果任何一个缓冲区在另一个缓冲区完成之前完成,而不是该方法挂起,因为该缓冲区.Receive()没有任何数据要接收。如果我将while条件更改为&&then 我将不会从另一个缓冲区获取所有数据。

有什么建议么?

4

3 回答 3

3

关于实际问题,读取块的流程存在问题。最简单的解决方案是仅使用具有多个生产者的单个缓冲区和一个结合消息包的单个消费者

您尝试使用DataFlow 块解决的概念问题在于异步流事件的基本性质。推送事件,并拉取异步流

有几种解决方案可以将它们映射在一起,但我认为最优雅的方法就是使用Unbounded Channel作为缓冲区

Channels 是比 DataFlow 更现代的方法,自由度更低,比 a 更笨重BufferBlock,而且非常轻量级和高度优化。此外,我只会为不同的响应类型传递一个包装器。

忽略任何其他问题(概念或其他问题)。

给定

public enum MessageType
{
   Output,
   Error
}

public class Message
{
   public MessageType MessageType { get; set; }
   public string Data { get; set; }

   public Message(string data, MessageType messageType )
   {
      Data = data;
      MessageType = messageType;
   }
}

用法

private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
     ProcessStartInfo processStartInfo, 
     CancellationToken cancellationToken)
{
   using var process = new Process() { StartInfo = processStartInfo };

   var ch = Channel.CreateUnbounded<Message>();
   var completeCount = 0;

   void OnReceived(string data, MessageType type)
   {
      // The Interlocked memory barrier is likely overkill here
      if (data is null && Interlocked.Increment(ref completeCount) == 2)
         ch?.Writer.Complete();
      else
         ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
   }

   process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
   process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);

   // start the process 
   // ...

   await foreach (var message in ch.Reader
           .ReadAllAsync(cancellationToken)
           .ConfigureAwait(false))
      yield return message;

   // cleanup
   // ...
}

注意:完全未经测试

于 2020-11-04T08:05:48.600 回答
1

而是在退出时完成。

void HandleData(object sender, DataReceivedEventArgs e)
{
    if (e.Data != null) dataBuffer.Post(e.Data);
}

process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) => 
{
    process.WaitForExit();
    dataBuffer.Complete();
};
于 2020-11-04T08:57:44.823 回答
0

您可以使用单个ProcessData项目缓冲区:

var buffer = new BufferBlock<ProcessData>();

然后在两个事件都传播了一个值Complete时使用自定义机制来完成缓冲区:null

process.OutputDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(1);
        else buffer.Post(new ProcessData() { Output = e.Data });
};

process.ErrorDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(2);
        else buffer.Post(new ProcessData() { Error = e.Data });
};

这是该Complete方法的实现:

bool[] completeState = new bool[2];
void Complete(int index)
{
    bool completed;
    lock (completeState.SyncRoot)
    {
        completeState[index - 1] = true;
        completed = completeState.All(v => v);
    }
    if (completed) buffer.Complete();
}
于 2020-11-04T07:07:17.523 回答