当 Sidekiq 或 Resque 队列没有移动时,是否有一种简单的方法来接收通知?
我们遇到了我们的工人随机死亡并且队列最终静止的问题。在我们致力于解决垂死的工人问题时,我们希望抢占对停滞工作的支持电话。
我编写了一个模块,定期检查工作人员以查看他们最后一次运行的时间。
我创建了一个调用 killStaleWorkers() 的 Cron 作业。它基本上检查 redis 队列以查看当前工作人员开始运行的时间。就我而言,如果作业运行时间超过 2 分钟,则意味着它已经过时,并且可能已冻结。发生这种情况时,您可以做几件事,例如重新启动 sidekiq(我这样做了)。你可以在任何你想要的时间里度过,这取决于工作通常需要多长时间。
这是我的代码。
class SidekiqDoctor
def percentageStale(workers, staleTime, redis)
stale = 0
workers.each do |worker|
key = "worker:" + worker + ":started"
timeStarted = Time.parse(redis.get(key))
if(timeStarted == nil)
timeStarted = Time.now
end
puts "Key: " + key + ", Time started: " + timeStarted.to_f.to_s + ", Stale Time: " + staleTime.to_f.to_s
if(timeStarted.to_f <= staleTime.to_f)
stale = stale + 1
end
end
percentage = (stale.to_f / workers.size.to_f) * 100
return percentage
end
def killStaleWorkers(staleTimeAgo = 2.minutes, redis = Sidekiq.redis { |x| x })
workers = redis.smembers("workers")
currentMachine = Socket.gethostname
existingWorkers = {} #key is PID, value is array of workers
workers.each do |worker|
tokens = worker.split(":")
machine = tokens[0]
pid = tokens[1].split("-")[0]
puts "Machine: " + machine + ", PID: " + pid
if(machine == currentMachine)
begin
Process.getpgid( pid .to_i)
if(existingWorkers[pid] == nil)
existingWorkers[pid] = []
end
existingWorkers[pid] << worker
rescue Errno::ESRCH
#pid doesn't exist
redis.srem("workers", worker)
puts "PID doesn't exist: " + pid
end
end
end
pids = existingWorkers.keys
staleTime = Time.now - staleTimeAgo
puts "Stale time: " + staleTime.to_s
restarted = false
percentStale = 0
for pid in pids
puts "Testing PID: " + pid.to_s
percentStale = percentageStale(existingWorkers[pid], staleTime, redis)
puts "Stale Time %: " + percentStale.to_s
#restart Sidekiq by touching restart.txt, which forces God to restart it
if(percentStale >= 25)
restartGod
restarted = true
end
end
return [restarted, percentStale]
end
end
没有任何现有的工具可以监视缓慢移动/静止的队列。所以我写了一个小 gem,它添加了一些 Sidekiq 中间件,并提供了一个任务,当队列开始变得陈旧时,它将向 Campfire 发送消息。