4

在 ruby​​ 代码中,我正在使用 Open3.popen3 运行系统调用,并使用生成的 IO 用于 stdout 和 stderr 在写入一个日志文件之前进行一些日志消息格式化。我想知道这样做的最佳方法是什么,以便日志消息将保持正确的顺序,注意我需要对错误消息和标准输出消息进行单独的格式化。

这是我当前的代码(假设记录器是线程安全的)

Open3.popen3("my_custom_script with_some_args") do |_in, stdout, stderr|
  stdout_thr = Thread.new do
    while line = stdout.gets.chomp
      logger.info(format(:info, line))
    end
  end
  stderr_thr = Thread.new do
    while line = stderr.gets.chomp
      logger.error(format(:error, line))
    end
  end
  [stdout_thr, stderr_thr].each(&:join)
end

到目前为止,这对我有用,但我不太确信我可以保证日志消息的正确顺序。有没有更好的办法?

4

1 回答 1

2

你想要达到的目标是不可能有保证的。首先要注意的是,您的代码只能根据接收数据的时间而不是生成时间来排序,这并不完全相同。保证这一点的唯一方法是在源上做一些事情,这将在两个系统之间添加一些有保证的排序。

下面的代码应该通过删除线程使其“更有可能”是正确的。假设您正在使用 MRI,线程是“绿色的”,因此技术上不能同时运行。这意味着您很感激调度程序选择在“正确”时间运行您的线程。

Open3.popen3("my_custom_script with_some_args") do |_in, stdout, stderr|
  for_reading = [stdout, stderr]
  until(for_reading.empty?) do
    wait_timeout = 1
    # IO.select blocks until one of the streams is has something to read
    # or the wait timeout is reached
    readable, _writable, errors = IO.select(for_reading, [], [], wait_timeout)

    # readable is nil in the case of a timeout - loop back again
    if readable.nil?
      Thread.pass
    else
      # In the case that both streams are readable (and thus have content)
      # read from each of them. In this case, we cannot guarantee any order
      # because we recieve the items at essentially the same time.
      # We can still ensure that we don't mix data incorrectly.
      readable.each do |stream|
        buffer = ''
        # loop through reading data until there is an EOF (value is nil)
        # or there is no more data to read (value is empty)
        while(true) do
          tmp = stream.read_nonblock(4096, buffer, exception: false)
          if tmp.nil?
            # stream is EOF - nothing more to read on that one..
            for_reading -= [stream]
            break
          elsif tmp.empty? || tmp == :wait_readable
            # nothing more to read right now...
            # continue on to process the buffer into lines and log them
            break
          end
        end

        if stream == stdout
          buffer.split("\n").each { |line| logger.info(format(:info, line)) }
        elsif stream == stderr
          buffer.split("\n").each { |line| logger.info(format(:error, line)) }
        end
      end
    end
  end
end

请注意,在一个在很短的时间内产生大量输出的系统中,更有可能发生重叠,导致事情发生混乱。这种可能性随着读取和处理流所花费的时间而增加。最好确保在循环内完成绝对最小的处理。如果格式化(和写入)成本很高,请考虑将这些项目移动到从单个队列读取的单独线程中,并让循环内的代码仅将缓冲区(和源标识符)推送到队列中。

于 2020-11-16T08:40:28.467 回答