您的主循环控制了参与者/应用程序的线程。
您的程序所做的只是生成后台进程,但从不运行它们。您需要sleep
在循环中纯粹让后台线程引起注意。
像这里一样让无条件循环产生无限后台进程通常不是一个好主意。应该有一个延迟,或者在那里放置一个条件语句......否则你只会有一个无限循环产生永远不会被调用的东西。
想一想:如果你puts "looping"
只是把它放在你的循环中,而你看不到Running in the background
......你会looping
一遍又一遍地看到。
方法#1:使用every
或after
阻止。
解决此问题的最佳方法不是sleep
在 a中使用loop
,而是使用after
orevery
块,如下所示:
every(0.1) {
on_background
}
或者最重要的是,如果您想确保进程在再次运行之前完全运行,请after
改用:
def run_method
@running ||= false
unless @running
@running = true
on_background
@running = false
end
after(0.1) { run_method }
end
使用 aloop
不是一个好主意,async
除非完成了某种流控制,或者像 with @server.accept
... 这样的阻塞进程,否则它只会无缘无故地拉动 100% 的 CPU 内核。
顺便说一句,您也可以使用now_and_every
...now_and_after
这将立即运行该块,然后在您想要的时间后再次运行它。
every
这个要点显示了使用:
在我看来,理想的情况是:
这是一个粗略但立即可用的示例:
require 'celluloid/current'
class Indefinite
include Celluloid
INTERVAL = 0.5
ONE_AT_A_TIME = true
def self.run!
puts "000a Instantiating."
indefinite = new
indefinite.run
puts "000b Running forever:"
sleep
end
def initialize
puts "001a Initializing."
@mutex = Mutex.new if ONE_AT_A_TIME
@running = false
puts "001b Interval: #{INTERVAL}"
end
def run
puts "002a Running."
unless ONE_AT_A_TIME && @running
if ONE_AT_A_TIME
@mutex.synchronize {
puts "002b Inside lock."
@running = true
on_background
@running = false
}
else
puts "002b Without lock."
on_background
end
end
puts "002c Setting new timer."
after(INTERVAL) { run }
end
def on_background
if ONE_AT_A_TIME
puts "003 Running background processor in foreground."
else
puts "003 Running in background"
end
end
end
Indefinite.run!
puts "004 End of application."
这将是它的输出,如果ONE_AT_A_TIME
是true
:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
如果ONE_AT_A_TIME
是,这将是它的输出false
:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
您需要更多的“事件”而不是“线程”才能正确发出任务并保留范围和状态,而不是在线程/参与者之间发出命令......这是every
andafter
块提供的。除此之外,无论哪种方式都是很好的做法,即使您没有Global Interpreter Lock
要处理的问题,因为在您的示例中,您似乎没有处理阻塞进程。如果你有一个阻塞进程,那么一定会有一个无限循环。但是,由于您最终会在处理一个任务之前生成无限数量的后台任务,因此您需要使用sleep
与您的问题开头类似的方法,或者完全使用不同的策略,然后使用every
and after
which is howCelluloid
它本身鼓励您在处理任何类型的套接字上的数据时进行操作。
方法#2:使用递归方法调用。
这刚刚出现在 Google Group 中。下面的示例代码实际上将允许执行其他任务,即使它是一个无限循环。
这种方法不太理想,因为它可能会产生更多开销,从而产生一系列纤维。
def work
# ...
async.work
end
问题#2:Thread
与Fiber
行为。
第二个问题是为什么以下方法会起作用:loop { Thread.new { puts "Hello" } }
这会产生无限数量的进程线程,这些线程由RVM
直接管理。即使您正在使用的Global Interpreter Lock
中有一个RVM
...这仅意味着没有green threads
使用,这是由操作系统本身提供的...相反,这些是由进程本身处理的。进程的 CPU 调度程序Thread
会毫不犹豫地自行运行。在这个例子中,Thread
运行非常快,然后就死掉了。
与async
任务相比,Fiber
使用了a。所以发生了什么,在默认情况下:
- 进程开始。
- 演员实例化。
- 方法调用调用循环。
- 循环调用
async
方法。
async
方法将任务添加到邮箱。
- 邮箱没有被调用,并且循环继续。
- 另一个
async
任务被添加到邮箱。
- 这无限地继续下去。
以上是因为循环方法本身是一个Fiber
调用,它永远不会被挂起(除非 asleep
被调用!),因此添加到邮箱的附加任务永远不会调用 new Fiber
。A 的Fiber
行为与 a 不同Thread
。这是讨论差异的很好的参考资料:
问题#3:Celluloid
与Celluloid::ZMQ
行为。
第三个问题是为什么include Celluloid
行为不同于Celluloid::ZMQ
...
这是因为Celluloid::ZMQ
使用基于反应器的事件邮箱,而不是Celluloid
使用基于条件变量的邮箱。
阅读有关流水线和执行模式的更多信息:
这就是两个例子之间的区别。如果您对这些邮箱的行为有其他疑问,请随时在Google Group上发帖……您面临的主要动态是与vs. vs.行为GIL
交互的独特性质。Fiber
Thread
Reactor
您可以在此处阅读有关反应器模式的更多信息:
并查看此处使用的特定反应器Celluloid::ZMQ
:
所以在事件邮箱场景中发生的事情是,当sleep
被击中时,这是一个阻塞调用,这会导致反应器移动到邮箱中的下一个任务。
而且,这对您的情况来说是独一无二的,正在使用的特定反应器正在Celluloid::ZMQ
使用一个永恒的 C 库......特别是0MQ
库。该反应器在您的应用程序外部,其行为与自身不同Celluloid::IO
,Celluloid
这也是行为发生与您预期不同的原因。
多核支持替代方案
如果维护状态和范围对您来说并不重要,如果您使用jRuby
orRubinius
不限于一个操作系统线程,与使用MRI
which has the相比Global Interpreter Lock
,您可以实例化多个async
参与者并同时在参与者之间发出调用。
但我的拙见是,使用非常高频率的计时器会更好地为您服务,例如0.001
或0.1
在我的示例中,这对于所有意图和目的来说似乎都是瞬时的,但也允许演员线程有足够的时间来切换光纤和运行邮箱中的其他任务。