0

我在配置 sidekiq 服务器时遇到问题,刷新页面后,该进程似乎正在前台运行。/consumers/fetch 我需要将其永久置于后台。

消费者控制器.rb

require 'kafka'

class ConsumersController < ApplicationController

  def fetch

    @consumer = Kafka::Consumer.new( { :host => ENV["host"],
        :port => ENV["port"],
        :topic => ENV["topic"]})

    @consumer.loop do |message|
      logger.info "-------------#{message.inspect}--------------"
      logger.info "-------------#{message.first.payload.inspect}--------------"
      unless message.blank?
        ConsumerWorker.perform_async(message.first.payload)
      end
    end
  end
end

consumer_worker.rb

class ConsumerWorker

  include Sidekiq::Worker

  def perform(message)
    payload = message.first["payload"]
    hash = JSON.parse(payload)
    return @message = Message.new(hash) if hash["concern"] == 'order_create' or hash["concern"] == 'first_payment'
  end

end

消息.rb

class Message

  attr_reader :bundle_id, :order_id, :order_number, :event

  def initialize(message)
    @payload = message["payload"]
    @bundle_id = @payload["bundle_id"]
    @order_id  = @payload["order_id"]
    @order_number = @payload["order_number"]
    @event = message["concern"]
  end
end
4

1 回答 1

0

我认为你需要移动这个块

@consumer.loop do |message|
end

不知何故在你的工作人员内部,因为我认为消费是在块执行之后完成的。

于 2013-08-01T09:33:28.297 回答