以下是我假设的,类似于您当前的实现:
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
q = EM::Queue.new
workers = Array.new(10) { Worker.new q }
如果我理解正确的话,上面的问题是你不希望工人从事新工作(在生产者时间线中较早到达的工作),而不是任何 reload_credentials 工作。以下内容应为此提供服务(最后还有额外的警告)。
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
class LockingDispatcher
def initialize channel, queue
@channel = channel
@queue = queue
@backlog = []
@channel.subscribe method(:dispatch_with_locking)
@locked = false
end
def dispatch_with_locking item
if locked?
@backlog << item
else
# You probably want to move the specialization here out into a method or
# block that's passed into the constructor, to make the lockingdispatcher
# more of a generic processor
case item.type
when :reload_credentials
lock
deferrable = CredentialReloader.new(item).start
deferrable.callback { unlock }
deferrable.errback { unlock }
else
dispatch_without_locking item
end
end
end
def dispatch_without_locking item
@queue << item
end
def locked?
@locked
end
def lock
@locked = true
end
def unlock
@locked = false
bl = @backlog.dup
@backlog.clear
bl.each { |item| dispatch_with_locking item }
end
end
channel = EM::Channel.new
queue = EM::Queue.new
dispatcher = LockingDispatcher.new channel, queue
workers = Array.new(10) { Worker.new queue }
所以,第一个系统的输入是打开的q
,但在这个新系统中它是打开的channel
。queue
仍然用于工作人员之间的工作分配,但在刷新queue
凭据操作正在进行时不会填充。不幸的是,由于我没有花更多的时间,所以我没有概括LockingDispatcher
它不与项目类型和调度代码耦合CredentialsReloader
。我会把它留给你。
您应该在这里注意,虽然我理解您的原始要求,但通常最好放宽这种要求。如果不更改该要求,则基本上无法消除几个突出的问题:
- 在启动凭据作业之前,系统不会等待执行作业完成
- 系统将非常糟糕地处理大量的凭证作业——其他可能可以处理的项目将无法处理。
- 在凭证代码中出现错误的情况下,积压可能会填满 ram 并导致失败。一个简单的超时可能足以避免灾难性影响,如果代码是可中止的,并且后续消息可以充分处理以避免进一步的死锁。
听起来您实际上对系统中有一些用户标识的概念。如果您仔细考虑您的要求,您可能只需要积压与凭据处于刷新状态的用户 ID 相关的项目。这是一个不同的问题,涉及不同类型的调度。为这些用户尝试锁定积压的哈希,在凭证完成时回调以将这些积压排入工作人员,或一些类似的安排。
祝你好运!