0

当 Sidekiq 或 Resque 队列没有移动时,是否有一种简单的方法来接收通知?

我们遇到了我们的工人随机死亡并且队列最终静止的问题。在我们致力于解决垂死的工人问题时,我们希望抢占对停滞工作的支持电话。

4

2 回答 2

0

我编写了一个模块,定期检查工作人员以查看他们最后一次运行的时间。

我创建了一个调用 killStaleWorkers() 的 Cron 作业。它基本上检查 redis 队列以查看当前工作人员开始运行的时间。就我而言,如果作业运行时间超过 2 分钟,则意味着它已经过时,并且可能已冻结。发生这种情况时,您可以做几件事,例如重新启动 sidekiq(我这样做了)。你可以在任何你想要的时间里度过,这取决于工作通常需要多长时间。

这是我的代码。

class SidekiqDoctor
    def percentageStale(workers, staleTime, redis)
        stale = 0

        workers.each do |worker|
            key = "worker:" + worker + ":started"
            timeStarted = Time.parse(redis.get(key))

            if(timeStarted == nil)
                timeStarted = Time.now
            end

            puts "Key: " + key + ", Time started: " + timeStarted.to_f.to_s + ", Stale Time: " + staleTime.to_f.to_s

            if(timeStarted.to_f <= staleTime.to_f)
                stale = stale + 1
            end
        end

        percentage = (stale.to_f / workers.size.to_f) * 100

        return percentage
    end

    def killStaleWorkers(staleTimeAgo = 2.minutes, redis = Sidekiq.redis { |x| x })
        workers = redis.smembers("workers")
        currentMachine = Socket.gethostname
        existingWorkers = {} #key is PID, value is array of workers

        workers.each do |worker|
            tokens = worker.split(":")
            machine = tokens[0]
            pid = tokens[1].split("-")[0]

            puts "Machine: " + machine + ", PID: " + pid
            if(machine == currentMachine)
                begin
                    Process.getpgid( pid .to_i)
                    if(existingWorkers[pid] == nil)
                        existingWorkers[pid] = []
                    end

                      existingWorkers[pid] << worker

                rescue Errno::ESRCH
                    #pid doesn't exist
                    redis.srem("workers", worker)
                    puts "PID doesn't exist: " + pid
                end
            end  
        end

        pids = existingWorkers.keys
        staleTime = Time.now - staleTimeAgo

        puts "Stale time: " + staleTime.to_s
        restarted = false
        percentStale = 0

        for pid in pids
            puts "Testing PID: " + pid.to_s
            percentStale = percentageStale(existingWorkers[pid], staleTime, redis)

            puts "Stale Time %: " + percentStale.to_s
            #restart Sidekiq by touching restart.txt, which forces God to restart it
            if(percentStale >= 25)
                restartGod
                restarted = true
            end
        end

        return [restarted, percentStale]
    end



end
于 2013-02-11T19:32:25.013 回答
0

没有任何现有的工具可以监视缓慢移动/静止的队列。所以我写了一个小 gem,它添加了一些 Sidekiq 中间件,并提供了一个任务,当队列开始变得陈旧时,它将向 Campfire 发送消息。

https://github.com/mpowered/sidekiq-nag

于 2013-02-12T06:20:24.360 回答