我有一个应用程序,用户可以将文件直接上传到 S3。这是有效的。现在,我需要一个后台工作人员(目前为delayed_job)来检索文件并将其存储在“tmp/files”中进行处理。
如何才能做到这一点?
编辑:该应用程序目前正在 EC2 中运行。
我有一个应用程序,用户可以将文件直接上传到 S3。这是有效的。现在,我需要一个后台工作人员(目前为delayed_job)来检索文件并将其存储在“tmp/files”中进行处理。
如何才能做到这一点?
编辑:该应用程序目前正在 EC2 中运行。
后台工作人员将独立于您的 Web 应用程序运行。
尝试使用Resque获取常用的 Rails 后台工作程序解决方案。这个想法是你独立于你的 web 应用程序启动 Resque,它的工作独立于应用程序。
让这个工作人员向 S3 发送基本的 HTTP 请求。这是一个API 参考卡,可帮助您入门。这个想法是您使用某种 Ruby REST 客户端来发送这些请求,并解析您从 S3 获得的响应。Rest-client是一个可以用来执行此操作的 gem。
或者,您也可以让工作人员使用S3 gem,这可能会更容易一些。
使用这种方法,您可以让您的工作人员运行一个执行类似操作的脚本
picture = S3Object.find 'headshot.jpg', 'photos'
使用 Resque。
添加
gem 'resque'
gem 'resque-status'
使用 Resque,您需要 Redis(存储有关工作人员的信息)或者使用 Redis-to-go,或者在您的 EC2 机器上本地安装 Redis。
所以在安装 Resque 后编辑 config/initializers/resque.rb
rails_root = ENV['RAILS_ROOT'] || File.dirname(__FILE__) + '/../..'
rails_env = ENV['RAILS_ENV'] || 'production'
resque_config = YAML.load_file(rails_root + '/config/resque.yml')
Resque.redis = resque_config[rails_env]
# This is if you are using Redis to go:
# ENV["REDISTOGO_URL"] ||= "redis://REDISTOGOSTUFFGOESHERE"
# uri = URI.parse(ENV["REDISTOGO_URL"])
# Resque.redis = Redis.new(:host => uri.host, :port => uri.port, :password => uri.password, :thread_safe => true)
Resque::Plugins::Status::Hash.expire_in = (24 * 60 * 60) # 24hrs in seconds
Dir["#{Rails.root}/app/workers/*.rb"].each { |file| require file }
这里我们使用的是本地 Redis,所以 resque.yml 看起来像这样:
development: localhost:6379
test: localhost:6379
fi: localhost:6379
production: localhost:6379
你需要像上帝一样的东西来启动/管理工人
所以安装它然后将“resque-production.god”添加到你的应用程序的config/文件夹你将能够通过这个启动你的worker:god -c config/resque-production.god config/resque-production.god 文件将有类似的东西:
rails_env = ENV['RAILS_ENV'] || "production"
rails_root = ENV['RAILS_ROOT'] || File.dirname(__FILE__) + '/..'
num_workers = 1
num_workers.times do |num|
God.watch do |w|
w.dir = "#{rails_root}"
w.name = "resque-#{num}"
w.group = 'resque'
w.interval = 30.seconds
w.env = {"QUEUE"=>"*", "RAILS_ENV"=>"production"}
w.start = "rake -f #{rails_root}/Rakefile environment resque:work --trace"
w.log = "#{rails_root}/log/resque.log"
w.err_log = "#{rails_root}/log/resque_error.log"
# restart if memory gets too high
w.transition(:up, :restart) do |on|
on.condition(:memory_usage) do |c|
c.above = 350.megabytes
c.times = 2
end
end
# determine the state on startup
w.transition(:init, { true => :up, false => :start }) do |on|
on.condition(:process_running) do |c|
c.running = true
end
end
# determine when process has finished starting
w.transition([:start, :restart], :up) do |on|
on.condition(:process_running) do |c|
c.running = true
c.interval = 5.seconds
end
# failsafe
on.condition(:tries) do |c|
c.times = 5
c.transition = :start
c.interval = 5.seconds
end
end
# start if process is not running
w.transition(:up, :start) do |on|
on.condition(:process_running) do |c|
c.running = false
end
end
end
end
最后是工人。他们进入 app/workers/ 文件夹(这里是 app/workers/processor.rb)
class Processor
include Resque::Plugins::Status
@queue = :collect_queue
def perform
article_id = options["article_id"]
article = Article.find(article_id)
article.download_remote_file(article.file_url)
end
end
它由 Article 模型中的回调触发 (app/models/article.rb)
class Article < ActiveRecord::Base
after_create :process
def download_remote_file(url)
# OpenURI extends Kernel.open to handle URLs as files
io = open(url)
# overrides Paperclip::Upfile#original_filename;
# we are creating a singleton method on specific object ('io')
def io.original_filename
base_uri.path.split('/').last
end
io.original_filename.blank? ? nil : io
end
def process
Processor.create(:article_id => self.id)
end
end