@@ -1,5 +1,9 @@
Revision history for Perl extension AnyEvent::RabbitMQ
+1.16 Sat Apr 12 14:42:00 BST 2014
+ - Doc fixes (Mark Ellis)
+ - Fix leak when calling ->close + tests (Peter Haworth)
+
1.15 Mon Jul 1 12:35:00 BST 2013
- Fix paper-bag bug in connection close - calling nonexistent method.
@@ -26,4 +26,5 @@ xt/02_perlcritic.t
xt/03_pod.t
xt/04_anyevent.t
xt/05_multi_channel.t
+xt/06_close.t
xt/perlcriticrc
@@ -35,4 +35,4 @@ requires:
resources:
license: http://dev.perl.org/licenses/
repository: git://github.com/bobtfish/AnyEvent-RabbitMQ.git
-version: 1.15
+version: 1.16
@@ -18,7 +18,7 @@ use constant {
_ST_OPEN => 2,
};
-our $VERSION = '1.15';
+our $VERSION = '1.16';
sub new {
my $class = shift;
@@ -3,7 +3,7 @@ package AnyEvent::RabbitMQ::LocalQueue;
use strict;
use warnings;
-our $VERSION = '1.15';
+our $VERSION = '1.16';
sub new {
my $class = shift;
@@ -32,7 +32,7 @@ use AnyEvent::RabbitMQ::LocalQueue;
use namespace::clean;
-our $VERSION = '1.15';
+our $VERSION = '1.16';
use constant {
_ST_CLOSED => 0,
@@ -421,7 +421,7 @@ sub close {
my $self = shift;
my %args = $self->_set_cbs(@_);
- if (!$self->{_state} == _ST_CLOSED) {
+ if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->(@_);
return $self;
}
@@ -459,7 +459,7 @@ sub _finish_close {
my $self = shift;
my %args = @_;
- if (my @ch = map { $_->id } grep { defined() && $_->is_open } keys %{$self->{_channels}}) {
+ if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) {
$args{on_failure}->("BUG: closing with channel(s) open: @ch");
return;
}
@@ -665,6 +665,7 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
tls => 0, # Or 1 if you'd like SSL
tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
on_success => sub {
+ my $ar = shift;
$ar->open_channel(
on_success => sub {
my $channel = shift;
@@ -680,7 +681,7 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
- }
+ },
);
},
on_failure => $cv,
@@ -688,7 +689,7 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
on_return => sub {
my $frame = shift;
die "Unable to deliver ", Dumper($frame);
- }
+ },
on_close => sub {
my $why = shift;
if (ref($why)) {
@@ -0,0 +1,109 @@
+use Test::More;
+use Test::Exception;
+
+my %conf = (
+ host => 'localhost',
+ port => 5672,
+ user => 'guest',
+ pass => 'guest',
+ vhost => '/',
+);
+
+eval {
+ use IO::Socket::INET;
+
+ my $socket = IO::Socket::INET->new(
+ Proto => 'tcp',
+ PeerAddr => $conf{host},
+ PeerPort => $conf{port},
+ Timeout => 1,
+ ) or die 'Error connecting to AMQP Server!';
+
+ close $socket;
+};
+
+plan skip_all => 'Connection failure: '
+ . $conf{host} . ':' . $conf{port} if $@;
+plan tests => 2;
+
+use AnyEvent::RabbitMQ;
+
+subtest 'No channels', sub {
+ my $ar = connect_ar();
+ ok $ar->is_open, 'connection is open';
+ is channel_count($ar), 0, 'no channels open';
+
+ close_ar($ar);
+ ok !$ar->is_open, 'connection closed';
+ is channel_count($ar), 0, 'no channels open';
+};
+
+subtest 'channels', sub {
+ my $ar = connect_ar();
+ ok $ar->is_open, 'connection is open';
+ is channel_count($ar), 0, 'no channels open';
+
+ my $ch = open_channel($ar);
+ ok $ch->is_open, 'channel is open';
+ is channel_count($ar), 1, 'no channels open';
+
+ close_ar($ar);
+ ok !$ar->is_open, 'connection closed';
+ is channel_count($ar), 0, 'no channels open';
+ ok !$ch->is_open, 'channel closed';
+};
+
+sub connect_ar {
+ my $done = AnyEvent->condvar;
+ my $ar = AnyEvent::RabbitMQ->new()->load_xml_spec()->connect(
+ (map {$_ => $conf{$_}} qw(host port user pass vhost)),
+ timeout => 1,
+ on_success => sub {$done->send(1)},
+ on_failure => sub { diag @_; $done->send()},
+ on_close => \&handle_close,
+ );
+ die 'Connection failure' if !$done->recv;
+ return $ar;
+}
+
+sub close_ar {
+ my ($ar,) = @_;
+
+ my $done = AnyEvent->condvar;
+ $ar->close(
+ on_success => sub {$done->send(1)},
+ on_failure => sub { diag @_; $done->send()},
+ );
+ die 'Close failure' if !$done->recv;
+
+ return;
+}
+
+sub channel_count {
+ my ($ar,) = @_;
+
+ return scalar keys %{$ar->channels};
+}
+
+sub open_channel {
+ my ($ar,) = @_;
+
+ my $done = AnyEvent->condvar;
+ $ar->open_channel(
+ on_success => sub {$done->send(shift)},
+ on_failure => sub {$done->send()},
+ on_return => sub {die 'Receive return'},
+ on_close => \&handle_close,
+ );
+ my $ch = $done->recv;
+ die 'Open channel failure' if !$ch;
+
+ return $ch;
+}
+
+sub handle_close {
+ my $method_frame = shift->method_frame;
+ die $method_frame->reply_code, $method_frame->reply_text
+ if $method_frame->reply_code;
+}
+