2

我目前正在构建一个管理仪表板来监控我们的 SQS 队列,受到这篇中型帖子的启发。

我设法在我的自定义 shoryuken middelware 中使用此代码正常工作:

class DashboardMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    redis = Redis.new(url: ENV['REDIS_DASHBOARD_URL'])
    check_if_already_failed(redis, sqs_msg.attributes['SentTimestamp'].to_i)
    redis.incr("sqs-dashboard-enqueued")
    yield
    redis.incr("sqs-dashboard-consumed")
  rescue Exception => e
    data =     {
      id: sqs_msg.attributes['SentTimestamp'].to_i,
      worker: worker_instance.class.to_s,
      queue: queue,
      error: e,
      attributes: sqs_msg.attributes,
      receipt_handle: sqs_msg.receipt_handle,
      body: body,
      enqueued_at: Time.at(sqs_msg.attributes['SentTimestamp'].to_i / 1000)
    }.to_json
    redis.lpush("sqs-dashboard-failures", data)
    raise e
  end

  def check_if_already_failed(redis, job_id)
    jobs = redis.lrange("sqs-dashboard-failures", 0, -1).map { |job| JSON.parse(job) }
    i = 0
    g = nil
    jobs.each do |j|
      g = i if j["id"] == job_id
      i += 1
    end
    unless g.nil?
      redis.lset("sqs-dashboard-failures", g, "DELETED")
      redis.lrem("sqs-dashboard-failures", 1, "DELETED")
    end
  end
end

因此,我在管理仪表板中显示了所有失败的作业,以及有关错误的详细信息。那是第一步。现在我希望能够手动重试那些工作(开始时一个)。我已经在网上搜索了很长时间,但没有找到任何可以做的事情(使用 shoryuken 或直接使用 sqs sdk)。

有没有人知道手动重试失败的消息?我们已经介绍了死信队列,但我们不希望使用它们。

非常感谢任何提示或起点:)

4

0 回答 0