14

尝试在我的工作示例中实现赛璐珞异步似乎表现出奇怪的行为。

这里我的代码看起来

 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!

但是当我运行上面的代码时,我从来没有看到“在后台运行

但是,如果我睡觉,代码似乎可以工作。

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!

任何想法?为什么在上述两种情况下会有这样的差异。

谢谢。

4

3 回答 3

17

您的主循环控制了参与者/应用程序的线程。

您的程序所做的只是生成后台进程,但从不运行它们。您需要sleep在循环中纯粹让后台线程引起注意。

像这里一样让无条件循环产生无限后台进程通常不是一个好主意。应该有一个延迟,或者在那里放置一个条件语句......否则你只会有一个无限循环产生永远不会被调用的东西。

想一想:如果你puts "looping"只是把它放在你的循环中,而你看不到Running in the background......你会looping一遍又一遍地看到。


方法#1:使用everyafter阻止。

解决此问题的最佳方法不是sleep在 a中使用loop,而是使用afterorevery块,如下所示:

every(0.1) {
    on_background
}

或者最重要的是,如果您想确保进程在再次运行之前完全运行,请after改用:

def run_method
    @running ||= false
    unless @running
        @running = true
        on_background
        @running = false
    end
    after(0.1) { run_method }
 end

使用 aloop不是一个好主意,async除非完成了某种流控制,或者像 with @server.accept... 这样的阻塞进程,否则它只会无缘无故地拉动 100% 的 CPU 内核。

顺便说一句,您也可以使用now_and_every...now_and_after这将立即运行该块,然后在您想要的时间后再次运行它。

every这个要点显示了使用:


在我看来,理想的情况是:

这是一个粗略但立即可用的示例:


require 'celluloid/current'

class Indefinite
  include Celluloid

  INTERVAL = 0.5
  ONE_AT_A_TIME = true

  def self.run!
    puts "000a Instantiating."
    indefinite = new
    indefinite.run
    puts "000b Running forever:"
    sleep
  end

  def initialize
    puts "001a Initializing."
    @mutex = Mutex.new if ONE_AT_A_TIME
    @running = false
    puts "001b Interval: #{INTERVAL}"
  end

  def run
    puts "002a Running."
    unless ONE_AT_A_TIME && @running
      if ONE_AT_A_TIME
        @mutex.synchronize {
          puts "002b Inside lock."
          @running = true
          on_background
          @running = false
        }
      else
        puts "002b Without lock."
        on_background
      end
    end
    puts "002c Setting new timer."
    after(INTERVAL) { run }
  end


  def on_background
    if ONE_AT_A_TIME
      puts "003 Running background processor in foreground."
    else
      puts "003 Running in background"
    end
  end
end

Indefinite.run!
puts "004 End of application."

这将是它的输出,如果ONE_AT_A_TIMEtrue

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.

如果ONE_AT_A_TIME是,这将是它的输出false

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.

您需要更多的“事件”而不是“线程”才能正确发出任务并保留范围和状态,而不是在线程/参与者之间发出命令......这是everyandafter块提供的。除此之外,无论哪种方式都是很好的做法,即使您没有Global Interpreter Lock要处理的问题,因为在您的示例中,您似乎没有处理阻塞进程。如果你有一个阻塞进程,那么一定会有一个无限循环。但是,由于您最终会在处理一个任务之前生成无限数量的后台任务,因此您需要使用sleep与您的问题开头类似的方法,或者完全使用不同的策略,然后使用everyand afterwhich is howCelluloid它本身鼓励您在处理任何类型的套接字上的数据时进行操作。


方法#2:使用递归方法调用。

这刚刚出现在 Google Group 中。下面的示例代码实际上将允许执行其他任务,即使它是一个无限循环。

这种方法不太理想,因为它可能会产生更多开销,从而产生一系列纤维。

def work
    # ...
    async.work
end

问题#2:ThreadFiber行为。

第二个问题是为什么以下方法会起作用:loop { Thread.new { puts "Hello" } }

这会产生无限数量的进程线程,这些线程由RVM直接管理。即使您正在使用的Global Interpreter Lock中有一个RVM...这仅意味着没有green threads使用,这是由操作系统本身提供的...相反,这些是由进程本身处理的。进程的 CPU 调度程序Thread会毫不犹豫地自行运行。在这个例子中,Thread运行非常快,然后就死掉了。

async任务相比,Fiber使用了a。所以发生了什么,在默认情况下:

  1. 进程开始。
  2. 演员实例化。
  3. 方法调用调用循环。
  4. 循环调用async方法。
  5. async方法将任务添加到邮箱。
  6. 邮箱没有被调用,并且循环继续。
  7. 另一个async任务被添加到邮箱。
  8. 这无限地继续下去。

以上是因为循环方法本身是一个Fiber调用,它永远不会被挂起(除非 asleep被调用!),因此添加到邮箱的附加任务永远不会调用 new Fiber。A 的Fiber行为与 a 不同Thread。这是讨论差异的很好的参考资料:


问题#3:CelluloidCelluloid::ZMQ行为。

第三个问题是为什么include Celluloid行为不同于Celluloid::ZMQ...

这是因为Celluloid::ZMQ使用基于反应器的事件邮箱,而不是Celluloid使用基于条件变量的邮箱。

阅读有关流水线和执行模式的更多信息:

这就是两个例子之间的区别。如果您对这些邮箱的行为有其他疑问,请随时在Google Group上发帖……您面临的主要动态是与vs. vs.行为GIL交互的独特性质。FiberThreadReactor

您可以在此处阅读有关反应器模式的更多信息:

并查看此处使用的特定反应器Celluloid::ZMQ

所以在事件邮箱场景中发生的事情是,当sleep被击中时,这是一个阻塞调用,这会导致反应器移动到邮箱中的下一个任务。

而且,这对您的情况来说是独一无二的,正在使用的特定反应器正在Celluloid::ZMQ使用一个永恒的 C 库......特别是0MQ库。该反应器在您的应用程序外部,其行为与自身不同Celluloid::IOCelluloid这也是行为发生与您预期不同的原因。

多核支持替代方案

如果维护状态和范围对您来说并不重要,如果您使用jRubyorRubinius不限于一个操作系统线程,与使用MRIwhich has the相比Global Interpreter Lock,您可以实例化多个async参与者并同时在参与者之间发出调用。

但我的拙见是,使用非常高频率的计时器会更好地为您服务,例如0.0010.1在我的示例中,这对于所有意图和目的来说似乎都是瞬时的,但也允许演员线程有足够的时间来切换光纤和运行邮箱中的其他任务。

于 2015-09-14T16:55:21.353 回答
4

让我们做一个实验,稍微修改一下你的例子(我们修改它是因为这样我们得到了相同的“奇怪”行为,同时让事情变得更清晰):

class Indefinite
  include Celluloid

  def run!
    (1..100).each do |i|
      async.on_background i
    end
    puts "100 requests sent from #{Actor.current.object_id}"
  end 

  def on_background(num)
    (1..100000000).each {}
    puts "message #{num} on #{Actor.current.object_id}" 
  end
end

Indefinite.new.run!
sleep

# =>
# 100 requests sent from 2084
# message 1 on 2084
# message 2 on 2084
# message 3 on 2084
# ...

你可以在任何 Ruby 解释器上运行它,使用Celluloidor Celluloid::ZMQ,结果总是一样的。另请注意,Actor.current.object_id两种方法的输出是相同的,这为我们提供了线索,即我们在实验中处理的是单个参与者。

所以ruby和赛璐珞的实现没有太大区别,只要关注这个实验。

让我们首先解决为什么这段代码会这样?

不难理解为什么会这样。赛璐珞正在接收传入的请求并将它们保存在任务队列中以供适当的参与者使用。请注意,我们最初的调用run!是在队列的顶部。

然后赛璐珞处理这些任务,一次一个。如果发生阻塞调用或sleep调用,根据文档,将调用下一个任务,而不是等待当前任务完成。

请注意,在我们的实验中没有阻塞调用。这意味着,该run!方法将从头到尾执行,并且只有在完成之后,每个on_background调用才会以完美的顺序被调用。

这就是它应该如何工作的方式。

如果您sleep在代码中添加调用,它会通知赛璐珞,它应该开始处理队列中的下一个任务。因此,您在第二个示例中的行为。

现在让我们继续讨论如何设计系统,使其不依赖于sleep调用,这至少很奇怪。

实际上在赛璐珞-ZMQ 项目页面上有一个很好的例子。注意这个循环:

def run
  loop { async.handle_message @socket.read }
end

它做的第一件事是@socket.read。请注意,这是一个阻塞操作。因此,赛璐珞将处理队列中的下一条消息(如果有的话)。一旦@socket.read响应,将生成一个新任务。但是在再次调用之前不会执行此任务@socket.read,从而阻塞执行,并通知赛璐珞处理队列中的下一项。

您可能会看到与您的示例不同的地方。你没有阻止任何东西,因此没有给赛璐珞一个处理队列的机会。

我们如何获得Celluloid::ZMQ示例中给出的行为?

第一个(在我看来,更好的)解决方案是进行实际阻塞调用,例如@socket.read.

如果您的代码中没有阻塞调用,并且您仍然需要在后台处理事情,那么您应该考虑Celluloid.

赛璐珞有几种选择。可以使用conditionsfuturesnotifications或只是在低级别调用wait/ signal,如下例所示:

class Indefinite
  include Celluloid

  def run!
    loop do
      async.on_background
      result = wait(:background) #=> 33
    end
  end 

  def on_background
    puts "background" 

    # notifies waiters, that they can continue
    signal(:background, 33)
  end
end

Indefinite.new.run!
sleep

# ...
# background
# background
# background
# ...

sleep(0)_Celluloid::ZMQ

我还注意到您在评论中提到的working.rb文件。它包含以下循环:

loop { [1].each { |i|  async.handle_message 'hello' } ; sleep(0) }

看起来它正在做正确的工作。实际上,在jRuby显示下运行它,它正在泄漏内存。为了让它更明显,尝试在handle_messagebody 中添加一个 sleep 调用:

def handle_message(message)
  sleep 0.5
  puts "got message: #{message}"
end

高内存使用可能与队列填充非常快并且无法在给定时间内处理的事实有关。问题会更大,如果handle_message工作量更大,那就是现在。

解决方案sleep

我对使用sleep. 它们可能需要大量内存,甚至会产生内存泄漏。而且还不清楚您应该将什么作为参数传递给sleep方法以及为什么。

于 2015-09-20T14:44:01.223 回答
2

线程如何与赛璐珞配合使用

赛璐珞不会为每个异步任务创建一个新线程。它有一个线程池,在其中运行每个任务,同步和异步任务。关键是该库将run!函数视为同步任务,并在与异步任务相同的上下文中执行它。

默认情况下,赛璐珞在单个线程中运行所有内容,使用队列系统为以后安排异步任务。它仅在需要时创建新线程。

除此之外,赛璐珞覆盖了该sleep功能。这意味着每次调用sleep扩展类的Celluloid类时,库都会检查其池中是否有非睡眠线程。在您的情况下,第一次调用sleep 0.5时,它将创建一个新的 Thread 以在第一个线程处于睡眠状态时执行队列中的异步任务。

所以在你的第一个例子中,只有一个赛璐珞线程正在运行,执行循环。在您的第二个示例中,两个赛璐珞线程正在运行,第一个执行循环并在每次迭代时休眠,另一个执行后台任务。

例如,您可以更改您的第一个示例以执行有限次数的迭代:

def run! 
  (0..100).each do
    [1].each do |i|
      async.on_background
    end
  end
  puts "Done!"
end

使用此run!函数时,您会看到Done!在所有 之前打印Running in background,这意味着赛璐珞run!在启动同一线程中的异步任务之前完成了函数的执行。

于 2015-09-14T12:55:23.587 回答