12

下午好,

我有两个独立但相关的应用程序。他们都应该有自己的后台队列(阅读:单独的 Sidekiq 和 Redis 进程)。但是,我希望偶尔能够将作业app2app1.

app1从简单的队列/推送角度来看,如果没有现有的 Sidekiq/Redis 堆栈,这样做很容易:

# In a process, far far away

# Configure client 
Sidekiq.configure_client do |config|
  config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' }
end

# Push jobs without class definition 
Sidekiq::Client.push('class' => 'Example::Workers::Trace', 'args' => ['hello!'])

# Push jobs overriding default's 
Sidekiq::Client.push('queue' => 'example', 'retry' => 3, 'class' =>     'Example::Workers::Trace', 'args' => ['hello!'])

但是,鉴于我已经调用了 aSidekiq.configure_clientSidekiq.configure_serverfrom app1,因此在这之间可能需要发生一些事情。

显然,我可以直接从 Sidekiq 内部获取序列化和规范化代码并手动推送到app2的 redis 队列,但这似乎是一个脆弱的解决方案。我希望能够使用该Client.push功能。

我想我理想的解决方案是这样的:

SidekiqTWO.configure_client { remote connection..... } SidekiqTWO::Client.push(job....)

甚至:

$redis_remote = remote_connection.....

Sidekiq::Client.push(job, $redis_remote)

显然有点滑稽,但这是我理想的用例。

谢谢!

4

3 回答 3

10

所以一件事是根据常见问题解答,“Sidekiq 消息格式非常简单和稳定:它只是 JSON 格式的哈希。” 强调我的——我不认为将 JSON 发送到 sidekiq 太脆弱了。尤其是当您想要围绕将作业发送到哪个 Redis 实例进行细粒度控制时,就像在 OP 的情况下一样,我可能只需要编写一个小包装器,让我可以指示 Redis 实例以及正在排队的作业。

对于 Kevin Bedell 将循环作业循环到 Redis 实例的更一般情况,我想您不想控制使用哪个 Redis 实例——您只想排队并自动管理分发。到目前为止,似乎只有一个人提出了这个要求,他们想出了一个解决方案,它使用Redis::Distributed

datastore_config = YAML.load(ERB.new(File.read(File.join(Rails.root, "config", "redis.yml"))).result)

datastore_config = datastore_config["defaults"].merge(datastore_config[::Rails.env])

if datastore_config[:host].is_a?(Array)
  if datastore_config[:host].length == 1
    datastore_config[:host] = datastore_config[:host].first
  else
    datastore_config = datastore_config[:host].map do |host|
      host_has_port = host =~ /:\d+\z/

      if host_has_port
        "redis://#{host}/#{datastore_config[:db] || 0}"
      else
        "redis://#{host}:#{datastore_config[:port] || 6379}/#{datastore_config[:db] || 0}"
      end
    end
  end
end

Sidekiq.configure_server do |config|
  config.redis = ::ConnectionPool.new(:size => Sidekiq.options[:concurrency] + 2, :timeout => 2) do
    redis = if datastore_config.is_a? Array
      Redis::Distributed.new(datastore_config)
    else
      Redis.new(datastore_config)
    end

    Redis::Namespace.new('resque', :redis => redis)
  end
end

在寻求高可用性和故障转移时要考虑的另一件事是获取包含可靠性功能的 Sidekiq Pro:“Sidekiq Pro 客户端可以承受短暂的 Redis 中断。它会在出错时在本地排队作业并尝试交付这些作业一旦连接恢复。” 由于 sidekiq 无论如何都是用于后台进程,因此如果 Redis 实例出现故障,那么短暂的延迟不会影响您的应用程序。如果您的两个 Redis 实例中的一个出现故障并且您正在使用循环,除非您使用此功能,否则您仍然会失去一些工作。

于 2013-11-10T03:40:10.093 回答
4

正如 carols10cents 所说,它非常简单,但由于我总是喜欢封装该功能并能够在其他项目中重用它,因此我从 Hotel Tonight 的博客中更新了一个想法。以下解决方案改进了在 Rails 4.1 和 Spring 预加载器中无法使用的 Hotel Tonight。

目前,我将以下文件添加到lib/remote_sidekiq/

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

remote_sidekiq_worker.rb

require 'sidekiq'
require 'sidekiq/client'

module RemoteSidekiqWorker
  def client
    pool = RemoteSidekiq.redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool
    Sidekiq::Client.new(pool)
  end

  def push(worker_name, attrs = [], queue_name = "default")
    client.push('args' => attrs, 'class' => worker_name, 'queue' => queue_name)
  end
end

您需要创建一个设置 redis_pool 的初始化程序

配置/初始化程序/remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

亚历克斯编辑:

在从来没有版本的 sidekiq 中,而不是行:

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

使用线:

redis_remote_options = {
  namespace: "yournamespace",
  url: ENV.fetch("REDISCLOUD_URL")
}

RemoteSidekiq.redis_pool = Sidekiq::RedisConnection.create(redis_remote_options)

然后,您可以在include RemoteSidekiqWorker任何需要的地方简单地安装模块。任务完成!

****更多更大的环境****

添加 RemoteWorker 模型会带来额外的好处:

  1. 您可以在任何地方重用 RemoteWorker,包括有权访问目标 sidekiq 工作人员的系统。这对调用者是透明的。要在目标 sidekiq 系统中使用“RemoteWorkers”表单,只需不要使用初始化程序,因为它将默认使用本地 Sidekiq 客户端。
  2. 使用 RemoteWorkers 确保始终发送正确的参数(代码 = 文档)
  3. 通过创建更复杂的 Sidekiq 架构来扩大规模对调用者来说是透明的。

这是一个示例 RemoteWorker

class RemoteTraceWorker
  include RemoteSidekiqWorker
  include ActiveModel::Model

  attr_accessor :message

  validates :message, presence: true

  def perform_async
    if valid?
      push(worker_name, worker_args)
    else
      raise ActiveModel::StrictValidationFailed, errors.full_messages
    end
  end

  private

  def worker_name
    :TraceWorker.to_s
  end

  def worker_args
    [message]
  end
end
于 2015-04-27T08:49:50.793 回答
1

我遇到了这个问题并遇到了一些问题,因为我正在使用ActiveJob,这使得从队列中读取消息的方式变得复杂。

基于 ARO 的回答,您仍然需要 redis_pool 设置:

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

配置/初始化程序/remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

现在我们将创建一个 ActiveJob Adapter 来代替 worker 来排队请求:

lib/active_job/queue_adapter/remote_sidekiq_adapter.rb

require 'sidekiq'

module ActiveJob
  module QueueAdapters
    class RemoteSidekiqAdapter
      def enqueue(job)
        #Sidekiq::Client does not support symbols as keys
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end

      def enqueue_at(job, timestamp)
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ],
          "at"      => timestamp
      end

      def client
        @client ||= ::Sidekiq::Client.new(RemoteSidekiq.redis_pool)
      end
    end
  end
end

我现在可以使用适配器对事件进行排队:

require 'active_job/queue_adapters/remote_sidekiq_adapter'

class RemoteJob < ActiveJob::Base
  self.queue_adapter = :remote_sidekiq

  queue_as :default

  def perform(_event_name, _data)
    fail "
      This job should not run here; intended to hook into
      ActiveJob and run in another system
    "
  end
end

我现在可以使用普通的 ActiveJob api 对作业进行排队。无论应用程序从队列中读取什么,都需要有一个匹配项RemoteJob才能执行该操作。

于 2016-10-16T19:54:15.357 回答