0

我正在尝试制作一个队列管理器,以便在特定文件夹中创建文件时获取作业。我使用 AnyEvent 创建了我的代码,所以它是异步的。我的问题是,我正在尝试使用回调从子程序 add_route 和 del_route 传递返回值,但是 AE::timer 不会停止,并且回调获取的值不会保存在变量中$return_code。我哪里出错了?

#!/usr/bin/perl
use strict;
use warnings;

use AnyEvent;
use AnyEvent::Filesys::Notify;
use Const::Fast;
use DDP;
use File::Basename;
use File::Copy;
use File::Slurp;
use FindBin '$Bin';
use List::Util qw(first);
use Regexp::Common qw(net);
use v5.10.1;

const my $true  => 1;
const my $false => 0;

my $cv = AE::cv;
my $jobs_folder_path    = $Bin . '/jobs';
my $interval = 5;
my $after    = 10;

my %jobs_folders        = (
    "new"       => "$jobs_folder_path/new",
    "progress"  => "$jobs_folder_path/progress",
    "failed"    => "$jobs_folder_path/failed",
);

my $notifier = AnyEvent::Filesys::Notify->new(
    dirs        => [ $jobs_folders{'new'} ],
    interval    => $interval,
    cb          => sub {
        my (@events) = @_;

        for my $event (@events) {
            if ($event->is_created) {
                process_new_job($event->path);
            }
        }
    }
);

my $timer = AE::timer $after, $interval, sub {
    my @files = read_dir($jobs_folders{'progress'}, prefix => $true);

    if (@files) {
        foreach my $file (@files) {
            my $file_name   = basename($file);
            my $line        = read_file($file);

            for ($file_name) {
                when (/add/) {
                    my ($ip_address, $next_hop) = split(/ /, $line);
                    my $return_code;
                    my $cb = sub {
                        my $ret_val =  shift;

                        $return_code = $ret_val;
                    };

                    add_route($ip_address, $next_hop, $cb);

                    print $return_code;
                    #post_job_process($return_code, $file_name);
                }
                when (/del/) {
                    my ($ip_address) = $line;
                    my $return_code;
                    my $cb = sub {
                        my $ret_val =  shift;

                        $return_code = $ret_val;
                    };

                    del_route($ip_address, $cb);

                    print $return_code, "\n";

                    #post_job_process($return_code, $file_name);
                }
            }
        }
    }
};

$cv->recv;

sub process_new_job {
    my ($new_job) = shift;

    my $file_name = basename($new_job);
    move("$jobs_folders{'new'}/$file_name", "$jobs_folders{'progress'}/$file_name");
}

sub post_job_process {
    my ($return_code, $file_name) = @_;

    if ($return_code == $false) {
        move("$jobs_folders{'progress'}/$file_name", "$jobs_folders{'failed'}/$file_name");
        send_email();
    }

}

sub send_email {
    print "Sending Email...\n";
}

sub add_route {
    my ($ip_address, $next_hop, $cb) = @_;

    my $attempt = 0;
    my $sleep   = 10;

    my $add_timer; $add_timer = AE::timer 0, $sleep, sub {

        if ($attempt++ >= 3) {
            undef $add_timer;
            $cb->($false);
        }

        print "$attempt. Adding Route $ip_address via $next_hop\n";

        my @addresses = get_routing_table();

        my ($comparable_ip) = $ip_address =~ /($RE{net}{IPv4})\/32$/;
        my $is_in_routing_table = first { $_->{'ip_address'} eq $comparable_ip } @addresses;

        if ($is_in_routing_table) {
            undef $add_timer;
            $cb->($true);
        }
    };
}

sub del_route {
    my ($ip_address, $cb) = @_;

    my $attempt = 0;
    my $sleep   = 10;

    my $delete_timer; $delete_timer = AE::timer 0, $sleep, sub {

        if ($attempt++ >= 3) {
            undef $delete_timer;
            $cb->($false);
        }

        print "$attempt. Deleting Route $ip_address\n";

        my @addresses = get_routing_table();

        my ($comparable_ip) = $ip_address =~ /^($RE{net}{IPv4})\/32/;
        my $is_in_routing_table = first { $_->{'ip_address'} eq $comparable_ip } @addresses;

        if (not $is_in_routing_table) {
            undef $delete_timer;
            $cb->($true);
        }
    };
}

sub get_routing_table {
    #my @routing_table = `ip ro`;
    my @routing_table = (
      '127.0.0.0/8 dev lo  proto kernel  scope link  src 127.0.0.1',
      '127.0.0.11 via 10.0.0.11 dev eth0  proto baba',
    );
    my @ret_val;

    foreach my $line (@routing_table) {
        my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via ($RE{net}{IPv4}) .*proto baba$/;
        if (defined ($ip_address) and defined ($next_hop)) {
            push @ret_val, { ip_address => $ip_address, next_hop => $next_hop };
        }
    }

    return @ret_val;
}

交叉张贴在PerlMonks

4

1 回答 1

1

在继续并打印返回码之前,您无需等待异步代码回调完成并实际设置值。例如,您需要在创建 add_route 和打印等待其完成的返回码之间有一个 condvar。类似于(尽管未经测试)的东西:

            when (/add/) {
                my ($ip_address, $next_hop) = split(/ /, $line);
                my $done = AE:cv;
                my $cb = sub {
                    my $ret_val =  shift;

                    $done->send($ret_val);
                };

                add_route($ip_address, $next_hop, $cb);

                my $return_code = $done->recv; ### Wait for the callback to finish...

                print $return_code;
                #post_job_process($return_code, $file_name);
            }

编辑:使用 Object::Event 作为基类确实帮助我在学习 AnyEvent 时弄清楚如何将所有内容联系在一起。

于 2013-06-19T16:24:22.233 回答