我目前正在构建一个管理仪表板来监控我们的 SQS 队列,受到这篇中型帖子的启发。
我设法在我的自定义 shoryuken middelware 中使用此代码正常工作:
class DashboardMiddleware
def call(worker_instance, queue, sqs_msg, body)
redis = Redis.new(url: ENV['REDIS_DASHBOARD_URL'])
check_if_already_failed(redis, sqs_msg.attributes['SentTimestamp'].to_i)
redis.incr("sqs-dashboard-enqueued")
yield
redis.incr("sqs-dashboard-consumed")
rescue Exception => e
data = {
id: sqs_msg.attributes['SentTimestamp'].to_i,
worker: worker_instance.class.to_s,
queue: queue,
error: e,
attributes: sqs_msg.attributes,
receipt_handle: sqs_msg.receipt_handle,
body: body,
enqueued_at: Time.at(sqs_msg.attributes['SentTimestamp'].to_i / 1000)
}.to_json
redis.lpush("sqs-dashboard-failures", data)
raise e
end
def check_if_already_failed(redis, job_id)
jobs = redis.lrange("sqs-dashboard-failures", 0, -1).map { |job| JSON.parse(job) }
i = 0
g = nil
jobs.each do |j|
g = i if j["id"] == job_id
i += 1
end
unless g.nil?
redis.lset("sqs-dashboard-failures", g, "DELETED")
redis.lrem("sqs-dashboard-failures", 1, "DELETED")
end
end
end
因此,我在管理仪表板中显示了所有失败的作业,以及有关错误的详细信息。那是第一步。现在我希望能够手动重试那些工作(开始时一个)。我已经在网上搜索了很长时间,但没有找到任何可以做的事情(使用 shoryuken 或直接使用 sqs sdk)。
有没有人知道手动重试失败的消息?我们已经介绍了死信队列,但我们不希望使用它们。
非常感谢任何提示或起点:)