3

我正在实现一个分布式 cronjob 执行系统(所谓的cron 计算集群)。当操作时间到了时,Cronjobs 应该排队到消息队列 (RabbitMQ) 中。另一方面(集群的节点/工作人员)是一个 Perl 守护进程AnyEvent::RabbitMQ,用于从消息队列中接收一个 cronjob/task/消息,处理该任务并从消息队列中请求另一个恰好一个 cronjob/task/消息,然后很快。

我使用 RabbitMQ 的心跳功能AnyEvent::RabbitMQ来帮助 RabbitMQ 识别断开的连接。

没关系心跳间隔的实际值!我也有很长的工作需要几天的时间。因此,将间隔设置为最长的 cronjob 将不是一种选择。

请参阅以下代码片段以在 Perl 守护进程工作程序中执行实际的 cronjob。它是在“AnyEvent->timer”中实现的,不会对消息执行 DoSing RabbitMQ。使用此方法是因为 RabbitMQconsume被禁止(被管理)。

sub _timer_tick {

  $rabbitmq_channel->get(
    queue      => 'job_queue',
    on_success => sub {
      my ($amqp_method) = @_;
      if ( not $amqp_method->{empty} ) {
        pause_timer();
        progress_job($amqp_method);
        resume_timer();
      }
    },
    on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
  );

  return;
}

progress_job()是解析消息并执行作业的位置。pause_timer()resume_timer()控制AnyEvent->timer触发_timer_tick()

use Capture::Tiny 'capture';
sub progress_job {
  my ($amqp_method) = @_;
  my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
  my ( $stdout, $stderr, $exit ) = capture {
    system $job->{execute};
  };
  return;
}

第一个长时间运行的作业进入,系统“崩溃”并显示各种错误消息。有时它会抛出“未知频道 ID:1”,有时它会抛出“频道已关闭”。所以我做了“愚蠢的调试”(试图弄乱配置),发现当heartbeat间隔短于progress_job()这些错误中所用的时间时,会抛出这些错误。经过一番思考,这是有道理的。progress_job()是一个阻塞子程序,AnyEvent 无法继续向 RabbitMQ 发送心跳包。

我对解决阻塞-heatbeat 问题的第一个快速想法是progress_job()在子进程中分叉并执行。FORK 上的 AnyEvents 文档指出,fork当子进程无法访问事件系统(例如通过 AnyEvent)时,它可以保存以供使用。下一个想法:好的,没有访问事件系统的权限,所以我可以做 fork。但是:计时器应该在返回resume_timer()后恢复()progress_job()。理论上resume_timer()会在返回之后fork()而不是之后调用。progress_job()所以我停止了我的实施。

我的问题:如何解决最后一点?如何resume_timer()progress_job()(或换句话说分叉的孩子)返回之后?resume_timer()由于分叉和事件系统不是线程安全的,我无法放入孩子中。

4

1 回答 1

3

除非程序被使用 AE 感知调用阻止,否则 AE 无法处理事件。system不知道AE。使用run_cmdAnyEvent ::Util代替。

于 2016-01-18T12:56:43.033 回答