2

这是我第一次尝试Perl 对象环境(POE)。我正在尝试创建一个程序,该程序将作为服务运行,该服务通过 couchdb 获取设备列表,生成子进程以 60 秒间隔 ping 它们,同时将并发子进程的最大数量限制为 3。

在延迟(1 分钟)后,我能够成功地将子进程重新排队,但是,我不确定如何管理调用同一事件的多个警报/延迟。我正在尝试将它们存储在$_[HEAP]->{timers}->{$_}$_定主机的位置。

#!/usr/bin/perl

use warnings;
use strict;
use POE qw(Wheel::Run Filter::Reference);
use CouchDB::Client;
use Data::Dumper;

use constant MAX_CONCURRENT_TASKS => 3;

our $couch      = CouchDB::Client->new(uri => 'http://192.168.1.100:5984/')->newDB('devices');

POE::Session->create(
        inline_states => {
                _start      => sub {
                        push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
                        $_[KERNEL]->delay_set('refresh', 60);
                        $_[HEAP]->{timers}->{$_} = $_[KERNEL]->delay_set('spawn', 1, $_) for @{$_[HEAP]->{devices}};
                }, 
                refresh => sub { 
                        undef @{$_[HEAP]->{devices}};
                        $_[KERNEL]->delay_set('refresh', 60); 
                        push (@{$_[HEAP]->{devices}}, $couch->newDoc($_->id)->retrieve->id) for @{$couch->listDocs};
                        print "\nRefreshing device list.\n\n";
                }, 
                spawn   => sub { 
                        if (keys(%{$_[HEAP]->{task}}) < MAX_CONCURRENT_TASKS) { 
                                print "Starting $_[ARG0].\t # of tasks running: ". keys(%{$_[HEAP]->{task}}),$/;
                                my $host = $_[ARG0];
                                my $task = POE::Wheel::Run->new(        
                                        Program      => sub { \&do_stuff($host) },
                                        StdoutFilter => POE::Filter::Reference->new(),
                                        StdoutEvent  => "task_result",
                                        StderrEvent  => "task_debug",
                                        CloseEvent   => "task_done"
                                );
                                $_[HEAP]->{task}->{$task->ID} = $task;
                                $_[KERNEL]->sig_child($task->PID, "sig_child", $_[ARG0]) 
                        } else { 
                                $_[KERNEL]->delay_adjust($_[HEAP]->{timers}->{$_[ARG0]}, 5);
                        }
                        print Dumper 'spawn', sort $_[HEAP]->{timers};
                },
                task_result => sub { 
                        print "Result for $_[ARG0]->{task}: $_[ARG0]->{status}\n";
                },
                task_done   => sub { 
                        delete $_[HEAP]->{task}->{$_[ARG0]};
                },
                task_debug  => sub { 
                        print "Debug: $_[ARG0]\n";
                },
                sig_child   => sub { 
                        delete $_[HEAP]->{$_[ARG1]};
                        $_[HEAP]->{timers}->{$_[ARG3]} = $_[KERNEL]->delay_set('spawn', 60, $_[ARG3]) if $_[ARG3] ~~ $_[HEAP]->{devices};  
                        $_[KERNEL]->alarm_remove($_[HEAP]->{timers}->{$_[ARG0]});
                }
        }
);
sub do_stuff {
        binmode(STDOUT);
        my $filter = POE::Filter::Reference->new();

        sleep(rand 5);

        my %result = (
                task   => shift,
                status => "complete.",
        );

        my $output = $filter->put([\%result]);
        print @$output;
}
POE::Kernel->run();
exit 0;

欢迎任何建议/策略。

编辑 1:我发现这$_[KERNEL]->delay不是为每个孩子设置计时器。我可以通过使用$_[KERNEL]->delay_set来让它工作。我现在无法拼凑的是如何限制程序在给定时间运行超过 3 个进程。

我正在创建初始计时器_start。如果子进程计数spawn$_[KERNEL]->delay_adjust3 或更高,则应将延迟延长 5 秒。

为回答问题所花费的时间道歉,这生活在我的工作电脑上,这个编辑是在星期一,我回来的第一天。

4

0 回答 0