我正在尝试制作一个队列管理器,以便在特定文件夹中创建文件时获取作业。我使用 AnyEvent 创建了我的代码,所以它是异步的。我的问题是,我正在尝试使用回调从子程序 add_route 和 del_route 传递返回值,但是 AE::timer 不会停止,并且回调获取的值不会保存在变量中$return_code。我哪里出错了?
#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Filesys::Notify;
use Const::Fast;
use DDP;
use File::Basename;
use File::Copy;
use File::Slurp;
use FindBin '$Bin';
use List::Util qw(first);
use Regexp::Common qw(net);
use v5.10.1;
const my $true => 1;
const my $false => 0;
my $cv = AE::cv;
my $jobs_folder_path = $Bin . '/jobs';
my $interval = 5;
my $after = 10;
my %jobs_folders = (
"new" => "$jobs_folder_path/new",
"progress" => "$jobs_folder_path/progress",
"failed" => "$jobs_folder_path/failed",
);
my $notifier = AnyEvent::Filesys::Notify->new(
dirs => [ $jobs_folders{'new'} ],
interval => $interval,
cb => sub {
my (@events) = @_;
for my $event (@events) {
if ($event->is_created) {
process_new_job($event->path);
}
}
}
);
my $timer = AE::timer $after, $interval, sub {
my @files = read_dir($jobs_folders{'progress'}, prefix => $true);
if (@files) {
foreach my $file (@files) {
my $file_name = basename($file);
my $line = read_file($file);
for ($file_name) {
when (/add/) {
my ($ip_address, $next_hop) = split(/ /, $line);
my $return_code;
my $cb = sub {
my $ret_val = shift;
$return_code = $ret_val;
};
add_route($ip_address, $next_hop, $cb);
print $return_code;
#post_job_process($return_code, $file_name);
}
when (/del/) {
my ($ip_address) = $line;
my $return_code;
my $cb = sub {
my $ret_val = shift;
$return_code = $ret_val;
};
del_route($ip_address, $cb);
print $return_code, "\n";
#post_job_process($return_code, $file_name);
}
}
}
}
};
$cv->recv;
sub process_new_job {
my ($new_job) = shift;
my $file_name = basename($new_job);
move("$jobs_folders{'new'}/$file_name", "$jobs_folders{'progress'}/$file_name");
}
sub post_job_process {
my ($return_code, $file_name) = @_;
if ($return_code == $false) {
move("$jobs_folders{'progress'}/$file_name", "$jobs_folders{'failed'}/$file_name");
send_email();
}
}
sub send_email {
print "Sending Email...\n";
}
sub add_route {
my ($ip_address, $next_hop, $cb) = @_;
my $attempt = 0;
my $sleep = 10;
my $add_timer; $add_timer = AE::timer 0, $sleep, sub {
if ($attempt++ >= 3) {
undef $add_timer;
$cb->($false);
}
print "$attempt. Adding Route $ip_address via $next_hop\n";
my @addresses = get_routing_table();
my ($comparable_ip) = $ip_address =~ /($RE{net}{IPv4})\/32$/;
my $is_in_routing_table = first { $_->{'ip_address'} eq $comparable_ip } @addresses;
if ($is_in_routing_table) {
undef $add_timer;
$cb->($true);
}
};
}
sub del_route {
my ($ip_address, $cb) = @_;
my $attempt = 0;
my $sleep = 10;
my $delete_timer; $delete_timer = AE::timer 0, $sleep, sub {
if ($attempt++ >= 3) {
undef $delete_timer;
$cb->($false);
}
print "$attempt. Deleting Route $ip_address\n";
my @addresses = get_routing_table();
my ($comparable_ip) = $ip_address =~ /^($RE{net}{IPv4})\/32/;
my $is_in_routing_table = first { $_->{'ip_address'} eq $comparable_ip } @addresses;
if (not $is_in_routing_table) {
undef $delete_timer;
$cb->($true);
}
};
}
sub get_routing_table {
#my @routing_table = `ip ro`;
my @routing_table = (
'127.0.0.0/8 dev lo proto kernel scope link src 127.0.0.1',
'127.0.0.11 via 10.0.0.11 dev eth0 proto baba',
);
my @ret_val;
foreach my $line (@routing_table) {
my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via ($RE{net}{IPv4}) .*proto baba$/;
if (defined ($ip_address) and defined ($next_hop)) {
push @ret_val, { ip_address => $ip_address, next_hop => $next_hop };
}
}
return @ret_val;
}
交叉张贴在PerlMonks