我正在编写一个用于将消息发布到消息队列( RabbitMQ)中的主程序。该程序是用 Perl 5 编写的,并使用AnyEvent::RabbitMQ与 RabbitMQ进行通信。
以下最小示例(对于我遇到的问题)将在通过同一通道发送第二个命令时失败,并出现错误“通道已关闭”。
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main {
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
on_success => sub { _on_connect_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
$condvar->recv;
$ar->close;
return;
}
############################################################################
sub _on_connect_success {
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
}
############################################################################
sub _open_channel {
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
_declare_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => '',
routing_key => '',
on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
}
############################################################################
sub _publish_message {
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => '',
routing_key => '',
body => $message,
header => {},
mandatory => 0,
immediate => 0,
on_success =>
sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
on_ack => sub { _error( $condvar, $ar, 'ack', @_ ) },
on_nack => sub { _error( $condvar, $ar, 'nack', @_ ) },
);
return;
}
############################################################################
sub _on_publish_message_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
sleep 1;
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
return;
}
############################################################################
sub _error {
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
}
############################################################################
sub _log {
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
return;
}
该程序应:
- 连接到 RabbitMQ
- 打开一个 RabbitMQ 通道
- 声明一个简单队列(名为“test”)
- 绑定到该队列(名为“test”)
- 发布消息(“你好,世界!”)
- 成功发布消息后,稍等片刻,再发布一条消息
这个程序(主程序)不应该消费消息。还有其他程序可以完成这项工作。
最小示例(见上文)将产生以下输出:
2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)
为什么AnyEvent::RabbitMQ
RabbitMQ 本身会关闭通道(不是连接或我错过了什么)?