1

我想创建一个 Pipe 类,以分两步的方式在 Ruby 中模拟 Unix 命令。第一步是通过添加一些命令来编译管道,第二步是运行该管道。这是一个样机:

#!/usr/bin/env ruby

p = Pipe.new
p.add(:cat, input: "table.txt")
p.add(:cut, field: 2)
p.add(:grep, pattern: "foo")
p.add(:puts, output: "result.txt")
p.run

问题是如何使用惰性评估对此进行编码,以便在调用时逐个记录地处理管道,run()而无需在任何时候将所有数据加载到内存中?

4

5 回答 5

3

看看http://ruby-doc.org/core-2.0.0/Enumerator.html类。该类Pipe将缝合在一起Enumerator,例如add(:cat, input: 'foo.txt')将创建一个产生行的枚举器foo.txtadd(:grep)将根据正则表达式等过滤它。

这是懒惰的文件阅读器

require 'benchmark'

def lazy_cat(filename)
  e = Enumerator.new do |yielder|
    f = File.open filename
    s = f.gets
    while s
      yielder.yield s
      s = f.gets
    end
  end
  e.lazy
end

def cat(filename)
  Enumerator.new do |yielder|
    f = File.open filename
    s = f.gets
    while s
      yielder.yield s
      s = f.gets
    end
  end
end

lazy = Benchmark.realtime { puts lazy_cat("log.txt").map{|s| s.upcase}.take(1).to_a }
puts "Lazy: #{lazy}"

eager = Benchmark.realtime { puts cat("log.txt").map{|s| s.upcase}.take(1).to_a }
puts "Eager: #{eager}"

急切版本需要 7 秒处理 1000 万行文件,懒惰版本几乎不需要时间。

于 2013-11-18T13:38:02.000 回答
1

据我了解,您可以一次简单地读取一行并将这一行移动到管道中,然后将其写入输出。一些代码:

output = File.new("output.txt")
File.new("input.txt").each do |line|
    record = read_record(line)
    newrecord = run_pipeline_on_one_record(record)
    output.write(dump_record(newrecord))
end

另一个更重的选项是创建实际的 IO 阻塞管道,并为管道中的每个任务使用一个线程。这在某种程度上重新组合了 Unix 所做的事情。

OP 语法的示例用法:

class Pipe
    def initialize
        @actions = []
    end
    def add(&block)
        @actions << block
    end
    def run(infile, outfile)
        output = File.open(outfile, "w")
        File.open(infile).each do |line|
            line.chomp!
            @actions.each {|act| line = act[line] }
            output.write(line+"\n")
        end
    end
end

p = Pipe.new
p.add {|line| line.size.to_s }
p.add {|line| "number of chars: #{line}" }
p.run("in.txt", "out.txt")

样品in.txt

aaa
12345
h

生成out.txt

number of chars: 3
number of chars: 5
number of chars: 1
于 2013-11-18T14:07:34.623 回答
1

这似乎有效:

#!/usr/bin/env ruby

require 'pp'

class Pipe
  def initialize
    @commands = []
  end

  def add(command, options = {})
    @commands << [command, options]

    self
  end

  def run
    enum = nil

    @commands.each do |command, options|
      enum = method(command).call enum, options
    end

    enum.each {}

    enum
  end

  def to_s
    cmd_string = "Pipe.new"

    @commands.each do |command, options|
      opt_list = []

      options.each do |key, value|
        if value.is_a? String
          opt_list << "#{key}: \"#{value}\""
        else
          opt_list << "#{key}: #{value}"
        end
      end

      cmd_string << ".add(:#{command}, #{opt_list.join(", ")})"
    end

    cmd_string << ".run"
  end

  private

  def cat(enum, options)
    Enumerator.new do |yielder|
      enum.map { |line| yielder << line } if enum

      File.open(options[:input]) do |ios|
        ios.each { |line| yielder << line }
      end
    end.lazy
  end

  def cut(enum, options)
    Enumerator.new do |yielder|
      enum.each do |line|
        fields = line.chomp.split(%r{#{options[:delimiter]}})

        yielder << fields[options[:field]]
      end
    end.lazy
  end

  def grep(enum, options)
    Enumerator.new do |yielder|
      enum.each do |line|
        yielder << line if line.match(options[:pattern])
      end
    end.lazy
  end

  def save(enum, options)
    Enumerator.new do |yielder|
      File.open(options[:output], 'w') do |ios|
        enum.each do |line|
          ios.puts line
          yielder << line
        end
      end
    end.lazy
  end
end

p = Pipe.new
p.add(:cat, input: "table.txt")
p.add(:cut, field: 2, delimiter: ',\s*')
p.add(:grep, pattern: "4")
p.add(:save, output: "result.txt")
p.run

puts p
于 2013-11-18T15:19:14.523 回答
0

https://stackoverflow.com/a/20049201/3183101

require 'benchmark'

def lazy_cat(filename)
  e = Enumerator.new do |yielder|
    f = File.open filename
    s = f.gets
    while s
      yielder.yield s
      s = f.gets
    end
  end
  e.lazy
end

def cat(filename)
  Enumerator.new do |yielder|
    f = File.open filename
    s = f.gets
    while s
      yielder.yield s
      s = f.gets
    end
  end
end

lazy = Benchmark.realtime { puts lazy_cat("log.txt").map{|s| s.upcase}.take(1).to_a }
puts "Lazy: #{lazy}"

eager = Benchmark.realtime { puts cat("log.txt").map{|s| s.upcase}.take(1).to_a }
puts "Eager: #{eager}"

这可以简化为以下内容,我认为这使得两种方法之间的差异更容易看到。

require 'benchmark'

def cat(filename, evaluation_strategy: :eager)
  e = Enumerator.new do |yielder|
    f = File.open filename
    s = f.gets
    while s
      yielder.yield s
      s = f.gets
    end
  end
  e.lazy if evaluation_strategy == :lazy
end

lazy = Benchmark.realtime { puts cat("log.txt", evaluation_strategy: :lazy).map{ |s|
  s.upcase}.take(1).to_a 
}
puts "Lazy: #{lazy}"

eager = Benchmark.realtime { puts cat("log.txt", evaluation_strategy: :eager).map{ |s|
  s.upcase}.take(1).to_a 
}
puts "Eager: #{eager}"

我本来只想把它放在评论中,但我在这里太“绿”了,不允许这样做。无论如何,发布我认为的所有代码的能力使它更清晰。

于 2014-01-25T18:58:46.970 回答
0

这建立在以前的答案的基础上,并作为有关枚举器陷阱的警告。尚未用尽(即 raise StopIteration)的枚举器将不会运行确保块。这意味着像这样的构造File.open { }不会自行清理。

例子:

def lazy_cat(filename)
  f = nil  # visible to the define_singleton_method block
  e = Enumerator.new do |yielder|
    # Also stored in @f for demonstration purposes only, so we examine it later
    @f = f = File.open filename
    s = f.gets
    while s
      yielder.yield s
      s = f.gets
    end
  end
  e.lazy.tap do |enum|
    # Provide a finish method to close the File
    # We can't use def enum.finish because it can't see 'f'
    enum.define_singleton_method(:finish) do
      f.close
    end
  end
end

def get_first_line(path)
  enum = lazy_cat(path)
  enum.take(1).to_a
end

def get_first_line_with_finish(path)
  enum = lazy_cat(path)
  enum.take(1).to_a
ensure
  enum.finish
end


# foo.txt contains:
# abc
# def
# ghi

puts "Without finish"
p get_first_line('foo.txt')
if @f.closed?
  puts "OK: handle was closed"
else
  puts "FAIL: handle not closed!"
  @f.close
end
puts

puts "With finish"
p get_first_line_with_finish('foo.txt')
if @f.closed?
  puts "OK: handle was closed"
else
  puts "FAIL: handle not closed!"
  @f.close
end

运行它会产生:

Without finish
["abc\n"]
FAIL: handle not closed!

With finish
["abc\n"]
OK: handle was closed

请注意,如果您不提供该finish方法,则不会关闭流,并且您将泄漏文件描述符。GC 可能会关闭它,但你不应该依赖它。

于 2016-07-15T17:00:29.673 回答