5

我必须编写一个脚本来并行获取一些 URL 并做一些工作。过去我一直使用Parallel::ForkManager这些东西,但现在我想学习一些新东西并尝试使用AnyEvent(和AnyEvent::HTTPAnyEvent::Curl::Multi)进行异步编程......但我在理解 AnyEvent 和编写一个脚本时遇到了问题,应该:

  • 打开一个文件(每一行都是一个单独的 URL)
  • (从现在开始并行,但限制 fe 10 个并发请求)
  • 逐行读取文件(我不想将整个文件加载到内存中 - 它可能很大)
  • 对该 URL 发出 HTTP 请求
  • 读取响应
  • 相应地更新 MySQL 记录
  • (下一个文件行)

我已经阅读了许多手册、教程,但我仍然很难理解阻塞代码和非阻塞代码之间的区别。我在http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent找到了类似的脚本,Szabo 先生在其中解释了基础知识,但我仍然无法理解如何实现类似的东西:

...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...

...并在这种情况下添加并发限制。

我将非常感谢您的帮助;)

更新

按照池上的建议,我Net::Curl::Multi试了一下。我对结果非常满意。经过多年Parallel::ForkManager仅用于并发抓取数千个 URL 之后,Net::Curl::Multi这似乎很棒。这是我while在文件句柄上带有循环的代码。它似乎可以正常工作,但考虑到这是我第一次写这样的东西,我想请更有经验的 Perl 用户看看并告诉我是否有一些潜在的错误,我错过的东西等等。另外,如果我可能会问:由于我不完全理解Net::Curl::Multi并发的工作原理,请告诉我是否应该预期将 MySQL UPDATE 命令(通过DBI)放入RESPONSE循环中会出现任何问题(显然除了更高的服务器负载 - 我希望最终脚本运行大约 50 个并发N::C::M工作人员,也许更多)。

#!/usr/bin/perl

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
    $easy->setopt( CURLOPT_FILE,       \$easy->{body} );
    return $easy;
}

my $maxWorkers = 10;

my $multi = Net::Curl::Multi->new();
my $workers = 0;

my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
    chomp( $url );
    $url .= "?$i";
    print "($i) $url\n";
    my $easy = make_request( $url );
    $multi->add_handle( $easy );
    $workers++;

    my $running = 0;
    do {
        my ($r, $w, $e) = $multi->fdset();
        my $timeout = $multi->timeout();
        select $r, $w, $e, $timeout / 1000
        if $timeout > 0;

        $running = $multi->perform();
        RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
            $multi->remove_handle( $easy );
            $workers--;
            printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
        }

        # dont max CPU while waiting
        select( undef, undef, undef, 0.01 );
    } while ( $workers == $maxWorkers || ( eof && $running ) );
    $i++;
}
close $fh;
4

2 回答 2

5

Net::Curl 是一个相当不错的库,速度非常快。此外,它也可以处理并行请求!我建议使用它而不是 AnyEvent。

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
    $easy->setopt( CURLOPT_FILE,       \$easy->{body} );
    return $easy;
}

my $max_running = 10;
my @urls = ( 'http://www.google.com/' );

my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
    while ( @urls && $running < $max_running ) {
       my $easy = make_request( shift( @urls ) );
       $multi->add_handle( $easy );
       ++$running;
    }

    last if !$running;

    my ( $r, $w, $e ) = $multi->fdset();
    my $timeout = $multi->timeout();
    select( $r, $w, $e, $timeout / 1000 )
        if $timeout > 0;

    $running = $multi->perform();
    while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
        $multi->remove_handle( $easy );
        printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
    }
}
于 2016-04-27T19:39:52.843 回答
2

这正是你想要的,以一种异步的方式,它通过Net::Curl以一种安全的方式包装来做到这一点:

#!/usr/bin/env perl

package MyDownloader;
use strict;
use warnings qw(all);

use Moo;

extends 'YADA::Worker';

has '+use_stats'=> (default => sub { 1 });
has '+retry'    => (default => sub { 10 });

after init => sub {
    my ($self) = @_;

    $self->setopt(
        encoding            => '',
        verbose             => 1,
    );
};

after finish => sub {
    my ($self, $result) = @_;

    if ($self->has_error) {
        print "ERROR: $result\n";
    } else {
        # do the interesting stuff here
        printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
    }
};

around has_error => sub {
    my $orig = shift;
    my $self = shift;

    return 1 if $self->$orig(@_);
    return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};

1;

package main;
use strict;
use warnings qw(all);

use Carp;

use YADA;

my $q = YADA->new(
    max     => 8,
    timeout => 30,
);

open(my $fh, '<', 'file_with_urls_per_line.txt')
    or croak "can't open queue: $!";
while (my $url = <$fh>) {
    chomp $url;

    $q->append(sub {
        MyDownloader->new($url)
    });
}
close $fh;
$q->wait;
于 2016-04-28T10:45:02.010 回答