我将讨论你的部分问题:如何编写一个长时间运行的 Perl 程序来处理 IO。
编写处理许多同时 IO 操作的 Perl 程序的最有效方法是使用事件循环。这将允许我们为事件编写处理程序,例如“命名管道上出现一行”或“电子邮件已成功发送”或“我们收到 SIGINT”。至关重要的是,它将允许我们在一个程序中组合任意数量的这些事件处理程序。这意味着您可以“多任务”,但仍然可以轻松地在任务之间共享状态。
我们将使用AnyEvent框架。它允许我们编写事件处理程序,称为观察程序,它可以与 Perl 支持的任何事件循环一起工作。您可能并不关心您使用哪个事件循环,因此这种抽象可能对您的应用程序无关紧要。但它会让我们重用 CPAN 上可用的预先编写的事件处理程序;AnyEvent::SMTP处理电子邮件,AnyEvent::Subprocess与子进程交互,AnyEvent::Handle处理管道,等等。
基于 AnyEvent 的守护进程的基本结构非常简单。你创建了一些观察者,进入事件循环,然后……就是这样;事件系统会做其他所有事情。首先,让我们编写一个每五秒打印一次“Hello”的程序。
我们首先加载模块:
use strict;
use warnings;
use 5.010;
use AnyEvent;
然后,我们将创建一个时间观察者,或“计时器”:
my $t = AnyEvent->timer( after => 0, interval => 5, cb => sub {
say "Hello";
});
请注意,我们将计时器分配给一个变量。只要$t
在范围内,这就会使计时器保持活动状态。如果我们说undef $t
,那么计时器将被取消,并且永远不会调用回调。
关于回调,就是sub { ... }
after cb =>
,这就是我们处理事件的方式。当事件发生时,回调被调用。我们做我们的事情,返回,事件循环继续根据需要调用其他回调。你可以在回调中做任何你想做的事情,包括取消和创建其他观察者。只是不要进行阻塞呼叫,例如system("/bin/sh long running process")
ormy $line = <$fh>
或sleep 10
。任何阻塞都必须由观察者完成;否则,事件循环将无法在等待该任务完成时运行其他处理程序。
现在我们有了一个计时器,我们只需要进入事件循环。通常,您将选择要使用的事件循环,并以事件循环文档描述的特定方式输入它。 EV是一个不错的选择,您可以通过调用 来输入它EV::loop()
。但是,我们将让 AnyEvent 通过编写AnyEvent->condvar->recv
. 不要担心这是做什么的;这是一个成语,意思是“进入事件循环并且永不返回”。(在阅读 AnyEvent 时,您会看到很多关于条件变量或 condvars 的内容。它们非常适合文档和单元测试中的示例,但您真的不想在程序中使用它们。如果您'在里面使用它们.pm
文件,你做错了什么。所以只要假装它们现在不存在,你就会从一开始就写出非常干净的代码。这将使您领先于许多 CPAN 作者!)
所以,为了完整起见:
AnyEvent->condvar->recv;
如果你运行那个程序,它会每五秒打印一次“Hello”,直到宇宙结束,或者,更有可能的是,你用控制 c 杀死它。这样做的好处是,您可以在打印“Hello”之间的那五秒钟内做其他事情,而您只需添加更多观察者即可。
所以,现在开始从管道中读取。AnyEvent 使用其 AnyEvent::Handle 模块使这变得非常容易。AnyEvent::Handle 可以连接到套接字或管道,并在有数据可供读取时调用回调。(它还可以进行非阻塞写入、TLS 和其他事情。但我们现在不关心这些。)
首先,我们需要打开一个管道:
use autodie 'open';
open my $fh, '<', '/path/to/pipe';
然后,我们用 AnyEvent::Handle 包装它。创建 Handle 对象后,我们将在此管道上的所有操作中使用它。你可以完全忘记$fh
,AnyEvent::Handle 将直接处理触摸它。
my $h = AnyEvent::Handle->new( fh => $fh );
现在我们可以$h
在管道可用时从管道中读取行:
$h->push_read( line => sub {
my ($h, $line, $eol) = @_;
say "Got a line: $line";
});
当下一行可用时,这将调用打印“Got a line”的回调。如果要继续读取行,则需要使函数将自身推回读取队列,例如:
my $handle_line; $handle_line = sub {
my ($h, $line, $eol) = @_;
say "Got a line: $line";
$h->push_read( line => $handle_line );
};
$h->push_read( line => $handle_line );
这将读取行并调用$handle_line->()
每一行,直到文件关闭。如果你想早点停止阅读,那很容易......push_read
在这种情况下不要再这样做了。(您不必在行级别阅读;您可以要求在任何字节可用时调用您的回调。但这更复杂,留给读者练习。)
所以现在我们可以将这一切结合到一个处理读取管道的守护进程中。我们要做的是:为行创建一个处理程序,打开管道并处理行,最后设置一个信号处理程序以干净地退出程序。我建议采用面向对象的方法来解决这个问题;使每个动作(“处理来自访问日志文件的行”)成为一个带有start
andstop
方法的类,实例化一堆动作,设置一个信号处理程序来干净地停止动作,启动所有动作,然后进入事件循环。有很多代码与这个问题并不真正相关,所以我们会做一些更简单的事情。但是在设计程序时请记住这一点。
#!/usr/bin/env perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Handle;
use EV;
use autodie 'open';
use 5.010;
my @handles;
my $abort; $abort = AnyEvent->signal( signal => 'INT', cb => sub {
say "Exiting.";
$_->destroy for @handles;
undef $abort;
# all watchers destroyed, event loop will return
});
my $handler; $handler = sub {
my ($h, $line, $eol) = @_;
my $name = $h->{name};
say "$name: $line";
$h->push_read( line => $handler );
};
for my $file (@ARGV) {
open my $fh, '<', $file;
my $h = AnyEvent::Handle->new( fh => $fh );
$h->{name} = $file;
$h->push_read( line => $handler );
}
EV::loop;
现在你有了一个程序,它从任意数量的管道中读取一行,打印在任何管道上接收到的每一行(以管道的路径为前缀),并在你按下 Control-C 时干净地退出!