我正在实现一个分布式 cronjob 执行系统(所谓的cron 计算集群)。当操作时间到了时,Cronjobs 应该排队到消息队列 (RabbitMQ) 中。另一方面(集群的节点/工作人员)是一个 Perl 守护进程AnyEvent::RabbitMQ
,用于从消息队列中接收一个 cronjob/task/消息,处理该任务并从消息队列中请求另一个恰好一个 cronjob/task/消息,然后很快。
我使用 RabbitMQ 的心跳功能AnyEvent::RabbitMQ
来帮助 RabbitMQ 识别断开的连接。
没关系心跳间隔的实际值!我也有很长的工作需要几天的时间。因此,将间隔设置为最长的 cronjob 将不是一种选择。
请参阅以下代码片段以在 Perl 守护进程工作程序中执行实际的 cronjob。它是在“AnyEvent->timer”中实现的,不会对消息执行 DoSing RabbitMQ。使用此方法是因为 RabbitMQconsume
被禁止(被管理)。
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if ( not $amqp_method->{empty} ) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
);
return;
}
progress_job()
是解析消息并执行作业的位置。pause_timer()
并resume_timer()
控制AnyEvent->timer
触发_timer_tick()
。
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
my ( $stdout, $stderr, $exit ) = capture {
system $job->{execute};
};
return;
}
第一个长时间运行的作业进入,系统“崩溃”并显示各种错误消息。有时它会抛出“未知频道 ID:1”,有时它会抛出“频道已关闭”。所以我做了“愚蠢的调试”(试图弄乱配置),发现当heartbeat
间隔短于progress_job()
这些错误中所用的时间时,会抛出这些错误。经过一番思考,这是有道理的。progress_job()
是一个阻塞子程序,AnyEvent 无法继续向 RabbitMQ 发送心跳包。
我对解决阻塞-heatbeat 问题的第一个快速想法是progress_job()
在子进程中分叉并执行。FORK 上的 AnyEvents 文档指出,fork
当子进程无法访问事件系统(例如通过 AnyEvent)时,它可以保存以供使用。下一个想法:好的,没有访问事件系统的权限,所以我可以做 fork。但是:计时器应该在返回resume_timer()
后恢复()progress_job()
。理论上resume_timer()
会在返回之后fork()
而不是之后调用。progress_job()
所以我停止了我的实施。
我的问题:如何解决最后一点?如何resume_timer()
在progress_job()
(或换句话说分叉的孩子)返回之后?resume_timer()
由于分叉和事件系统不是线程安全的,我无法放入孩子中。