我需要将目录中的一堆文件上传到 S3。由于上传所需的 90% 以上的时间都花在等待 http 请求完成上,我想以某种方式一次执行其中的几个。
Fibers 能帮我解决这个问题吗?它们被描述为解决此类问题的一种方法,但我想不出在 http 调用阻塞时可以做任何工作的任何方法。
有什么办法可以在没有线程的情况下解决这个问题?
我不支持 1.9 中的纤维,但 1.8.6 中的常规线程可以解决这个问题。尝试使用队列http://ruby-doc.org/stdlib/libdoc/thread/rdoc/classes/Queue.html
查看文档中的示例,您的消费者是上传的部分。它“使用”一个 URL 和一个文件,并上传数据。生产者是程序的一部分,它继续工作并找到要上传的新文件。
如果您想一次上传多个文件,只需为每个文件启动一个新线程:
t = Thread.new do
upload_file(param1, param2)
end
@all_threads << t
然后,稍后在您的“生产者”代码中(记住,不必在自己的线程中,它可以是主程序):
@all_threads.each do |t|
t.join if t.alive?
end
队列可以是@member_variable 或$global。
要回答您的实际问题:
Fibers 能帮我解决这个问题吗?
不,他们不能。Jörg W Mittag解释了为什么最好。
不,您不能使用 Fibers 进行并发处理。Fiber 根本不是并发构造,它们是控制流构造,就像异常一样。这就是 Fibers 的全部意义:它们从不并行运行,它们是合作的,它们是确定性的。纤维是协程。(事实上,我一直不明白为什么它们不被简单地称为协程。)
Ruby 中唯一的并发构造是 Thread。
当他说 Ruby 中唯一的并发结构是线程时,请记住 Ruby 有许多不同的实现,并且它们的线程实现各不相同。Jörg 再次为这些差异提供了一个很好的答案。并正确得出结论,只有像 JRuby(使用映射到本机线程的 JVM 线程)或分叉您的进程这样的东西才能实现真正的并行性。
有什么办法可以在没有线程的情况下解决这个问题?
除了分叉您的流程之外,我还建议您查看EventMachine和类似em-http-request 的东西。它是一个事件驱动的、非阻塞的、基于反应器模式的 HTTP 客户端,它是异步的,不会产生线程开销。
Aaron Patterson (@tenderlove) 使用了一个几乎与您完全相同的示例来准确描述为什么您可以并且应该在您的情况下使用线程来实现并发。
现在大多数 I/O 库都足够智能,可以在执行 IO 时释放 GVL(Global VM Lock,或者大多数人知道的 GIL 或 Global Interpreter Lock)。C 中有一个简单的函数调用来执行此操作。您无需担心 C 代码,但对您而言,这意味着大多数值得一提的 IO 库将释放 GVL 并允许其他线程执行,而正在执行 IO 的线程等待数据返回.
如果我刚才说的令人困惑,你不必太担心。您需要知道的主要事情是,如果您使用一个像样的库来执行您的 HTTP 请求(或任何其他 I/O 操作...数据库、进程间通信等),Ruby 解释器 (MRI)足够聪明,能够释放解释器上的锁,并允许其他线程在一个线程等待 IO 返回时执行。如果下一个线程有自己的 IO 要抓取,Ruby 解释器也会做同样的事情(假设 IO 库是为利用 Ruby 的这个特性而构建的,我相信现在大多数都是这样)。
所以,总结一下我所说的,使用线程!您应该会看到性能优势。如果没有,请检查您的 http 库是否使用 C 中的 rb_thread_blocking_region() 函数,如果没有,请找出原因。也许有充分的理由,也许您需要考虑使用更好的库。
Aaron Patterson 视频的链接在这里: http ://www.youtube.com/watch?v=kufXhNkm5WU
值得一看,即使只是为了笑,因为 Aaron Patterson 是互联网上最有趣的人之一。
您可以为此使用单独的进程而不是线程:
#!/usr/bin/env ruby
$stderr.sync = true
# Number of children to use for uploading
MAX_CHILDREN = 5
# Hash of PIDs for children that are working along with which file
# they're working on.
@child_pids = {}
# Keep track of uploads that failed
@failed_files = []
# Get the list of files to upload as arguments to the program
@files = ARGV
### Wait for a child to finish, adding the file to the list of those
### that failed if the child indicates there was a problem.
def wait_for_child
$stderr.puts " waiting for a child to finish..."
pid, status = Process.waitpid2( 0 )
file = @child_pids.delete( pid )
@failed_files << file unless status.success?
end
### Here's where you'd put the particulars of what gets uploaded and
### how. I'm just sleeping for the file size in bytes * milliseconds
### to simulate the upload, then returning either +true+ or +false+
### based on a random factor.
def upload( file )
bytes = File.size( file )
sleep( bytes * 0.00001 )
return rand( 100 ) > 5
end
### Start a child uploading the specified +file+.
def start_child( file )
if pid = Process.fork
$stderr.puts "%s: uploaded started by child %d" % [ file, pid ]
@child_pids[ pid ] = file
else
if upload( file )
$stderr.puts "%s: done." % [ file ]
exit 0 # success
else
$stderr.puts "%s: failed." % [ file ]
exit 255
end
end
end
until @files.empty?
# If there are already the maximum number of children running, wait
# for one to finish
wait_for_child() if @child_pids.length >= MAX_CHILDREN
# Start a new child working on the next file
start_child( @files.shift )
end
# Now we're just waiting on the final few uploads to finish
wait_for_child() until @child_pids.empty?
if @failed_files.empty?
exit 0
else
$stderr.puts "Some files failed to upload:",
@failed_files.collect {|file| " #{file}" }
exit 255
end