@@ -1,4 +1,11 @@
Revision history for Perl module Net::Stomp:
+0.51 2015-01-26
+ - fix and document return values of all methods
+ - add global read timeout attribute
+
+0.50 2015-01-26
+ - extract send_with_receipt from inside send_transactional
+
0.49 2014-07-14
- allow setting SSL per-server
- expose the currently connected host
@@ -7,8 +7,6 @@ lib/Net/Stomp/Frame.pm
lib/Net/Stomp/StupidLogger.pm
MANIFEST This list of files
MANIFEST.SKIP
-META.json
-META.yml
README
t/connect-disconnect.t
t/constructor.t
@@ -21,6 +19,9 @@ t/pod_coverage.t
t/receiving-frames.t
t/reconnect.t
t/send-transactional.t
+t/send-with-receipt.t
t/sending-frames.t
xt/consume_buffering.t
Makefile.PL
+META.yml
+META.json
@@ -7,7 +7,7 @@
"Ash Berlin <ash_github@firemirror.com>"
],
"dynamic_config" : 1,
- "generated_by" : "Module::Build version 0.4205",
+ "generated_by" : "Module::Build version 0.421",
"license" : [
"perl_5"
],
@@ -44,15 +44,15 @@
"provides" : {
"Net::Stomp" : {
"file" : "lib/Net/Stomp.pm",
- "version" : "0.49"
+ "version" : "0.51"
},
"Net::Stomp::Frame" : {
"file" : "lib/Net/Stomp/Frame.pm",
- "version" : "0.49"
+ "version" : "0.51"
},
"Net::Stomp::StupidLogger" : {
"file" : "lib/Net/Stomp/StupidLogger.pm",
- "version" : "0.49"
+ "version" : "0.51"
}
},
"release_status" : "stable",
@@ -61,5 +61,5 @@
"http://dev.perl.org/licenses/"
]
},
- "version" : "0.49"
+ "version" : "0.51"
}
@@ -12,7 +12,7 @@ build_requires:
configure_requires:
Module::Build: '0.42'
dynamic_config: 1
-generated_by: 'Module::Build version 0.4205, CPAN::Meta::Converter version 2.140640'
+generated_by: 'Module::Build version 0.421, CPAN::Meta::Converter version 2.143240'
license: perl
meta-spec:
url: http://module-build.sourceforge.net/META-spec-v1.4.html
@@ -21,13 +21,13 @@ name: Net-Stomp
provides:
Net::Stomp:
file: lib/Net/Stomp.pm
- version: '0.49'
+ version: '0.51'
Net::Stomp::Frame:
file: lib/Net/Stomp/Frame.pm
- version: '0.49'
+ version: '0.51'
Net::Stomp::StupidLogger:
file: lib/Net/Stomp/StupidLogger.pm
- version: '0.49'
+ version: '0.51'
recommends:
IO::Socket::IP: '0.20'
IO::Socket::SSL: '1.75'
@@ -37,4 +37,4 @@ requires:
IO::Socket::INET: '0'
resources:
license: http://dev.perl.org/licenses/
-version: '0.49'
+version: '0.51'
@@ -1,4 +1,4 @@
-# Note: this file was auto-generated by Module::Build::Compat version 0.4205
+# Note: this file was auto-generated by Module::Build::Compat version 0.4210
use ExtUtils::MakeMaker;
WriteMakefile
(
@@ -2,7 +2,7 @@ package Net::Stomp::Frame;
use strict;
use warnings;
-our $VERSION='0.49';
+our $VERSION='0.51';
use base 'Class::Accessor::Fast';
__PACKAGE__->mk_accessors(qw(command headers body));
@@ -3,7 +3,7 @@ use strict;
use warnings;
use Carp;
-our $VERSION = '0.49';
+our $VERSION = '0.51';
sub new {
my ($class,$levels) = @_;
@@ -6,13 +6,13 @@ use Net::Stomp::Frame;
use Carp qw(longmess);
use base 'Class::Accessor::Fast';
use Net::Stomp::StupidLogger;
-our $VERSION = '0.49';
+our $VERSION = '0.51';
__PACKAGE__->mk_accessors( qw(
current_host failover hostname hosts port select serial session_id socket ssl
ssl_options subscriptions _connect_headers bufsize
reconnect_on_fork logger connect_delay
- reconnect_attempts initial_reconnect_attempts
+ reconnect_attempts initial_reconnect_attempts timeout
) );
sub _logconfess {
@@ -136,11 +136,14 @@ sub _get_socket {
my ($self) = @_;
my $socket;
+ my $timeout = $self->timeout;
+ $timeout = 5 unless defined $timeout;
+
my %sockopts = (
PeerAddr => $self->hostname,
PeerPort => $self->port,
Proto => 'tcp',
- Timeout => 5
+ Timeout => $timeout,
);
if ( $self->ssl ) {
eval { require IO::Socket::SSL };
@@ -189,6 +192,7 @@ sub disconnect {
my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } );
$self->send_frame($frame);
$self->_close_socket;
+ return 1;
}
sub _reconnect {
@@ -219,7 +223,7 @@ sub can_read {
}
$conf ||= {};
- my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
+ my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
return $self->select->can_read($timeout) || 0;
}
@@ -230,28 +234,16 @@ sub send {
my $frame = Net::Stomp::Frame->new(
{ command => 'SEND', headers => $conf, body => $body } );
$self->send_frame($frame);
+ return 1;
}
-sub send_transactional {
+sub send_with_receipt {
my ( $self, $conf ) = @_;
- my $body = $conf->{body};
- delete $conf->{body};
-
- # begin the transaction
- my $transaction_id = $self->_get_next_transaction;
- my $begin_frame
- = Net::Stomp::Frame->new(
- { command => 'BEGIN', headers => { transaction => $transaction_id } }
- );
- $self->send_frame($begin_frame);
# send the message
my $receipt_id = $self->_get_next_transaction;
$conf->{receipt} = $receipt_id;
- $conf->{transaction} = $transaction_id;
- my $message_frame = Net::Stomp::Frame->new(
- { command => 'SEND', headers => $conf, body => $body } );
- $self->send_frame($message_frame);
+ $self->send($conf);
# check the receipt
my $receipt_frame = $self->receive_frame;
@@ -264,7 +256,32 @@ sub send_transactional {
&& $receipt_frame->command eq 'RECEIPT'
&& $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
{
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+sub send_transactional {
+ my ( $self, $conf ) = @_;
+ # begin the transaction
+ my $transaction_id = $self->_get_next_transaction;
+ my $begin_frame
+ = Net::Stomp::Frame->new(
+ { command => 'BEGIN', headers => { transaction => $transaction_id } }
+ );
+ $self->send_frame($begin_frame);
+
+ $conf->{transaction} = $transaction_id;
+ my $receipt_frame;
+ my $ret = $self->send_with_receipt($conf,$receipt_frame);
+
+ if (@_ > 2) {
+ $_[2] = $receipt_frame;
+ }
+
+ if ( $ret ) {
# success, commit the transaction
my $frame_commit = Net::Stomp::Frame->new(
{ command => 'COMMIT',
@@ -272,9 +289,7 @@ sub send_transactional {
}
);
$self->send_frame($frame_commit);
- return 1;
} else {
-
# some failure, abort transaction
my $frame_abort = Net::Stomp::Frame->new(
{ command => 'ABORT',
@@ -282,8 +297,8 @@ sub send_transactional {
}
);
$self->send_frame($frame_abort);
- return 0;
}
+ return $ret;
}
sub _sub_key {
@@ -300,6 +315,7 @@ sub subscribe {
$self->send_frame($frame);
my $subs = $self->subscriptions;
$subs->{_sub_key($conf)} = $conf;
+ return 1;
}
sub unsubscribe {
@@ -308,7 +324,8 @@ sub unsubscribe {
{ command => 'UNSUBSCRIBE', headers => $conf } );
$self->send_frame($frame);
my $subs = $self->subscriptions;
- delete $subs->{_sub_key($conf)}
+ delete $subs->{_sub_key($conf)};
+ return 1;
}
sub ack {
@@ -318,6 +335,7 @@ sub ack {
my $frame = Net::Stomp::Frame->new(
{ command => 'ACK', headers => { 'message-id' => $id, %$conf } } );
$self->send_frame($frame);
+ return 1;
}
sub send_frame {
@@ -346,6 +364,7 @@ sub send_frame {
$self->_reconnect;
$self->send_frame($frame);
}
+ return;
}
sub _read_data {
@@ -450,7 +469,7 @@ sub _connected {
sub receive_frame {
my ($self, $conf) = @_;
- my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
+ my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
unless (defined $self->_connected) {
$self->_reconnect;
@@ -725,6 +744,11 @@ sleep of L<< /C<connect_delay> >> seconds.
Integer, defaults to 5. How many seconds to sleep between connection
attempts to brokers.
+=head2 C<timeout>
+
+Integer, in seconds, defaults to C<undef>. The default timeout for
+read operations. C<undef> means "wait forever".
+
=head1 METHODS
=head2 C<connect>
@@ -751,10 +775,55 @@ It's probably a good idea to pass a C<content-length> corresponding to
the byte length of the C<body>; this is necessary if the C<body>
contains a byte 0.
+Always returns a true value. It automatically reconnects if writing to
+the socket fails.
+
+=head2 C<send_with_receipt>
+
+This sends a message asking for a receipt, and returns false if the
+receipt of the message is not acknowledged by the server:
+
+ $stomp->send_with_receipt(
+ { destination => '/queue/foo', body => 'test message' }
+ ) or die "Couldn't send the message!";
+
+If using ActiveMQ, you might also want to make the message persistent:
+
+ $stomp->send_transactional(
+ { destination => '/queue/foo', body => 'test message', persistent => 'true' }
+ ) or die "Couldn't send the message!";
+
+The actual frame sequence for a successful sending is:
+
+ -> SEND
+ <- RECEIPT
+
+The actual frame sequence for a failed sending is:
+
+ -> SEND
+ <- anything but RECEIPT
+
+If you are using this connection only to send (i.e. you've never
+called L<< /C<subscribe> >>), the only thing that could be received
+instead of a C<RECEIPT> is an C<ERROR> frame, but if you subscribed,
+the broker may well send a C<MESSAGE> before sending the
+C<RECEIPT>. B<DO NOT> use this method on a connection used for
+receiving.
+
+If you want to see the C<RECEIPT> or C<ERROR> frame, pass a scalar as
+a second parameter to the method, and it will be set to the received
+frame:
+
+ my $success = $stomp->send_transactional(
+ { destination => '/queue/foo', body => 'test message' },
+ $received_frame,
+ );
+ if (not $success) { warn $received_frame->as_string }
+
=head2 C<send_transactional>
-This sends a message in transactional mode and fails if the receipt of the
-message is not acknowledged by the server:
+This sends a message in transactional mode and returns false if the
+receipt of the message is not acknowledged by the server:
$stomp->send_transactional(
{ destination => '/queue/foo', body => 'test message' }
@@ -766,6 +835,9 @@ If using ActiveMQ, you might also want to make the message persistent:
{ destination => '/queue/foo', body => 'test message', persistent => 'true' }
) or die "Couldn't send the message!";
+C<send_transactional> just wraps C<send_with_receipt> in a STOMP
+transaction.
+
The actual frame sequence for a successful sending is:
-> BEGIN
@@ -807,11 +879,15 @@ If you call any other method after this, a new connection will be
established automatically (to the next failover host, if there's more
than one).
+Always returns a true value.
+
=head2 C<subscribe>
This subscribes you to a queue or topic. You must pass in a
C<destination>.
+Always returns a true value.
+
The acknowledge mode (header C<ack>) defaults to C<auto>, which means
that frames will be considered delivered after they have been sent to
a client. The other option is C<client>, which means that messages
@@ -901,6 +977,8 @@ C<destination> or an C<id>:
$stomp->unsubcribe({ destination => '/queue/foo' });
+Always returns a true value.
+
=head2 C<receive_frame>
This blocks and returns you the next Stomp frame, or C<undef> if there
@@ -909,8 +987,9 @@ was a connection problem.
my $frame = $stomp->receive_frame;
warn $frame->body; # do something here
-By default this method will block until a frame can be returned. If you wish to
-wait for a specified time pass a C<timeout> argument:
+By default this method will block until a frame can be returned, or
+for however long the L</timeout> attribue says. If you wish to wait
+for a specified time pass a C<timeout> argument:
# Wait half a second for a frame, else return undef
$stomp->receive_frame({ timeout => 0.5 })
@@ -923,8 +1002,9 @@ STOMP server. Optionally takes a timeout in seconds:
my $can_read = $stomp->can_read;
my $can_read = $stomp->can_read({ timeout => '0.1' });
-C<undef> says block until something can be read, C<0> says to poll and return
-immediately.
+C<undef> says block until something can be read, C<0> says to poll and
+return immediately. This method ignores the value of the L</timeout>
+attribute.
=head2 C<ack>
@@ -933,6 +1013,8 @@ all frames before it> (if you are using client acknowledgements):
$stomp->ack( { frame => $frame } );
+Always returns a true value.
+
=head2 C<send_frame>
If this module does not provide enough help for sending frames, you
@@ -949,6 +1031,8 @@ the connection breaks (limited by L<< /C<reconnect_attempts> >>). If
no connection can be established, and L<< /C<reconnect_attempts> >> is
not 0, this method will C<die>.
+Always returns an empty list.
+
=head1 SEE ALSO
L<Net::Stomp::Frame>.
@@ -2,9 +2,19 @@ use lib 't/lib';
use TestHelp;
use Net::Stomp::Frame;
-my ($s,$fh)=mkstomp_testsocket();
+my ($s,$fh)=mkstomp_testsocket(timeout=>1);
subtest 'one frame' => sub {
+ my $timeout_in_call;
+
+ my $orig = \&Net::Stomp::_read_data;
+ no warnings 'redefine';
+ local *Net::Stomp::_read_data = sub {
+ my ($self,$timeout) = @_;
+ $timeout_in_call=$timeout;
+ $self->$orig($timeout);
+ };
+
my $frame = Net::Stomp::Frame->new({
command=>'MESSAGE',
headers=>{'message-id'=>1},
@@ -14,6 +24,11 @@ subtest 'one frame' => sub {
$fh->{to_read}=$frame->as_string;
my $received = $s->receive_frame;
cmp_deeply($received,$frame,'received and parsed');
+ is($timeout_in_call,1,'correct timeout passed');
+
+ $fh->{to_read}=$frame->as_string;
+ $received = $s->receive_frame({timeout=>3});
+ is($timeout_in_call,3,'correct timeout passed');
};
subtest 'two frames' => sub {
@@ -200,4 +200,13 @@ subtest 'report failure if sysread EOF' => sub {
_test_receive;
};
+subtest 'send_with_receipt report failure if receive_frame does' => sub {
+ my $ret = do {
+ local $fh->{to_read} = sub {$!=1;return};
+ $s->send_with_receipt({some=>'header',body=>'string'});
+ };
+ ok(!$ret,'reported false');
+ ok(!defined($fh->{connected}),'socket closed');
+};
+
done_testing;
@@ -28,7 +28,7 @@ $fh->{written} = sub {
# -> COMMIT
sub _testit {
- my ($response_frame,$expected) = @_;
+ my ($response_frame,$expected_ret,$expected_command) = @_;
$fh->{to_read} = sub {
if ($frames[1]) {
return $response_frame->($frames[1]->headers->{receipt})
@@ -38,7 +38,9 @@ sub _testit {
};
@frames=();
- $s->send_transactional({some=>'header',body=>'string'});
+ my $ret = $s->send_transactional({some=>'header',body=>'string'});
+
+ cmp_deeply($ret,bool($expected_ret),"expected return value");
is(scalar(@frames),3,'3 frames sent');
@@ -69,12 +71,12 @@ sub _testit {
cmp_deeply(
$frames[2],
methods(
- command => uc($expected),
+ command => uc($expected_command),
headers => {
transaction=>$transaction,
},
),
- "\L$expected\E ok",
+ "\L$expected_command\E ok",
);
}
@@ -83,7 +85,7 @@ subtest 'successful' => sub {
command=>'RECEIPT',
headers=>{'receipt-id'=>$_[0]},
body=>undef,
- }) },'COMMIT');
+ }) },1,'COMMIT');
};
subtest 'failed' => sub {
@@ -91,7 +93,7 @@ subtest 'failed' => sub {
command=>'ERROR',
headers=>{some=>'header'},
body=>undef,
- }) },'ABORT');
+ }) },0,'ABORT');
};
subtest 'bad receipt' => sub {
@@ -99,7 +101,7 @@ subtest 'bad receipt' => sub {
command=>'RECEIPT',
headers=>{'receipt-id'=>"not-$_[0]"},
body=>undef,
- }) },'ABORT');
+ }) },0,'ABORT');
};
done_testing;
@@ -0,0 +1,81 @@
+use lib 't/lib';
+use TestHelp;
+use Net::Stomp::Frame;
+
+my ($s,$fh)=mkstomp_testsocket();
+
+my @frames;my $buffer='';
+$fh->{written} = sub {
+ $buffer .= $_[0];
+ my $frame = Net::Stomp::Frame->parse($buffer);
+ if ($frame) {
+ $buffer='';
+ push @frames,$frame;
+ }
+ return length($_[0]);
+};
+
+# expected:
+# -> SEND
+# <- RECEIPT
+#
+# or
+# -> SEND
+# <- something else
+
+sub _testit {
+ my ($response_frame,$expected) = @_;
+ $fh->{to_read} = sub {
+ if ($frames[0]) {
+ return $response_frame->($frames[0]->headers->{receipt})
+ ->as_string;
+ }
+ return '';
+ };
+
+ @frames=();
+ my $ret = $s->send_with_receipt({some=>'header',body=>'string'});
+
+ is(scalar(@frames),1,'1 frame sent');
+
+ cmp_deeply(
+ $frames[0],
+ methods(
+ command => 'SEND',
+ headers => {
+ some=>'header',
+ receipt=>ignore(),
+ },
+ body => 'string',
+ ),
+ 'send ok',
+ );
+
+ is ($ret,$expected,'return value as expected');
+}
+
+subtest 'successful' => sub {
+ _testit(sub{ Net::Stomp::Frame->new({
+ command=>'RECEIPT',
+ headers=>{'receipt-id'=>$_[0]},
+ body=>undef,
+ }) },1);
+};
+
+subtest 'failed' => sub {
+ _testit(sub{ Net::Stomp::Frame->new({
+ command=>'ERROR',
+ headers=>{some=>'header'},
+ body=>undef,
+ }) },0);
+};
+
+subtest 'bad receipt' => sub {
+ _testit(sub{ Net::Stomp::Frame->new({
+ command=>'RECEIPT',
+ headers=>{'receipt-id'=>"not-$_[0]"},
+ body=>undef,
+ }) },0);
+};
+
+done_testing;
@@ -11,7 +11,8 @@ my ($s,$fh)=mkstomp_testsocket();
sub _testit {
my ($method,$arg,@tests) = @_;
@frames=();
- $s->$method($arg);
+ my $ret = $s->$method($arg);
+ ok($ret,"$method returned true");
cmp_deeply(
\@frames,
[all(