package Net::Stomp;
use strict;
use warnings;
use IO::Select;
use Net::Stomp::Frame;
use Carp;
use base 'Class::Accessor::Fast';
our $VERSION = '0.46';
__PACKAGE__->mk_accessors( qw(
_cur_host failover hostname hosts port select serial session_id socket ssl
ssl_options subscriptions _connect_headers bufsize
reconnect_on_fork
) );
sub new {
my $class = shift;
my $self = $class->SUPER::new(@_);
$self->bufsize(8192) unless $self->bufsize;
$self->reconnect_on_fork(1) unless defined $self->reconnect_on_fork;
$self->{_framebuf} = "";
# We are not subscribed to anything at the start
$self->subscriptions( {} );
$self->select( IO::Select->new );
my @hosts = ();
# failover://tcp://primary:61616
# failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
if ($self->failover) {
my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix;
confess "Unable to parse failover uri: " . $self->failover
unless $uris;
foreach my $host (split(/,/,$uris)) {
$host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || confess "Unable to parse failover component: '$host'";
my ($hostname, $port) = ($1, $2);
push(@hosts, {hostname => $hostname, port => $port});
}
} elsif ($self->hosts) {
## @hosts is used inside the while loop later to decide whether we have
## cycled through all setup hosts.
@hosts = @{$self->hosts};
}
$self->hosts(@hosts);
my $err;
{
local $@ = 'run me!';
while($@) {
eval { $self->_get_connection };
last unless $@;
if (!@hosts || $self->_cur_host == $#hosts ) {
# We've cycled through all setup hosts. Die now. Can't die because
# $@ is localized.
$err = $@;
last;
}
sleep(5);
}
}
die $err if $err;
return $self;
}
my $socket_class;
sub _get_connection {
my $self = shift;
if (my $hosts = $self->hosts) {
if (defined $self->_cur_host && ($self->_cur_host < $#{$hosts} ) ) {
$self->_cur_host($self->_cur_host+1);
} else {
$self->_cur_host(0);
}
$self->hostname($hosts->[$self->_cur_host]->{hostname});
$self->port($hosts->[$self->_cur_host]->{port});
}
my ($socket);
my %sockopts = (
PeerAddr => $self->hostname,
PeerPort => $self->port,
Proto => 'tcp',
Timeout => 5
);
if ( $self->ssl ) {
eval { require IO::Socket::SSL };
die
"You should install the IO::Socket::SSL module for SSL support in Net::Stomp"
if $@;
%sockopts = ( %sockopts, %{ $self->ssl_options || {} } );
$socket = IO::Socket::SSL->new(%sockopts);
} else {
$socket_class ||= eval { require IO::Socket::IP; IO::Socket::IP->VERSION('0.20'); "IO::Socket::IP" }
|| do { require IO::Socket::INET; "IO::Socket::INET" };
$socket = $socket_class->new(%sockopts);
binmode($socket) if $socket;
}
die "Error connecting to " . $self->hostname . ':' . $self->port . ": $@"
unless $socket;
$self->select->remove($self->socket) if $self->socket;
$self->select->add($socket);
$self->socket($socket);
$self->{_pid} = $$;
}
sub connect {
my ( $self, $conf ) = @_;
my $frame = Net::Stomp::Frame->new(
{ command => 'CONNECT', headers => $conf } );
$self->send_frame($frame);
$frame = $self->receive_frame;
# Setting initial values for session id, as given from
# the stomp server
$self->session_id( $frame->headers->{session} );
$self->_connect_headers( $conf );
return $frame;
}
sub disconnect {
my $self = shift;
my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } );
$self->send_frame($frame);
$self->socket->close;
$self->select->remove($self->socket);
}
sub _reconnect {
my $self = shift;
if ($self->socket) {
$self->socket->close;
}
eval { $self->_get_connection };
while ($@) {
sleep(5);
eval { $self->_get_connection };
}
$self->connect( $self->_connect_headers );
for my $sub(keys %{$self->subscriptions}) {
$self->subscribe($self->subscriptions->{$sub});
}
}
sub can_read {
my ( $self, $conf ) = @_;
# If there is any data left in the framebuffer that we haven't read, return
# 'true'. But we don't want to spin endlessly, so only return true the
# first time. (Anything touching the _framebuf should update this flag when
# it does something.
if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
$self->{_framebuf_changed} = 0;
return 1;
}
$conf ||= {};
my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
return $self->select->can_read($timeout) || 0;
}
sub send {
my ( $self, $conf ) = @_;
my $body = $conf->{body};
delete $conf->{body};
my $frame = Net::Stomp::Frame->new(
{ command => 'SEND', headers => $conf, body => $body } );
$self->send_frame($frame);
}
sub send_transactional {
my ( $self, $conf ) = @_;
my $body = $conf->{body};
delete $conf->{body};
# begin the transaction
my $transaction_id = $self->_get_next_transaction;
my $begin_frame
= Net::Stomp::Frame->new(
{ command => 'BEGIN', headers => { transaction => $transaction_id } }
);
$self->send_frame($begin_frame);
# send the message
my $receipt_id = $self->_get_next_transaction;
$conf->{receipt} = $receipt_id;
my $message_frame = Net::Stomp::Frame->new(
{ command => 'SEND', headers => $conf, body => $body } );
$self->send_frame($message_frame);
# check the receipt
my $receipt_frame = $self->receive_frame;
if ( $receipt_frame->command eq 'RECEIPT'
&& $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
{
# success, commit the transaction
my $frame_commit = Net::Stomp::Frame->new(
{ command => 'COMMIT',
headers => { transaction => $transaction_id }
}
);
return $self->send_frame($frame_commit);
} else {
# some failure, abort transaction
my $frame_abort = Net::Stomp::Frame->new(
{ command => 'ABORT',
headers => { transaction => $transaction_id }
}
);
$self->send_frame($frame_abort);
return 0;
}
}
sub _sub_key {
my ($conf) = @_;
if ($conf->{id}) { return "id-".$conf->{id} }
return "dest-".$conf->{destination}
}
sub subscribe {
my ( $self, $conf ) = @_;
my $frame = Net::Stomp::Frame->new(
{ command => 'SUBSCRIBE', headers => $conf } );
$self->send_frame($frame);
my $subs = $self->subscriptions;
$subs->{_sub_key($conf)} = $conf;
}
sub unsubscribe {
my ( $self, $conf ) = @_;
my $frame = Net::Stomp::Frame->new(
{ command => 'UNSUBSCRIBE', headers => $conf } );
$self->send_frame($frame);
my $subs = $self->subscriptions;
delete $subs->{_sub_key($conf)}
}
sub ack {
my ( $self, $conf ) = @_;
my $id = $conf->{frame}->headers->{'message-id'};
my $frame = Net::Stomp::Frame->new(
{ command => 'ACK', headers => { 'message-id' => $id } } );
$self->send_frame($frame);
}
sub send_frame {
my ( $self, $frame ) = @_;
# see if we're connected before we try to syswrite()
if (not defined $self->_connected) {
$self->_reconnect;
if (not defined $self->_connected) {
warn q{wasn't connected; couldn't _reconnect()};
}
}
my $written = $self->socket->syswrite( $frame->as_string );
if (($written||0) != length($frame->as_string)) {
warn 'only wrote '
. ($written||0)
. ' characters out of the '
. length($frame->as_string)
. ' character frame';
warn 'problem frame: <<' . $frame->as_string . '>>';
}
unless (defined $self->_connected) {
$self->_reconnect;
$self->send_frame($frame);
}
}
sub _read_data {
my ($self, $timeout) = @_;
return unless $self->select->can_read($timeout);
my $len = $self->socket->sysread($self->{_framebuf},
$self->bufsize,
length($self->{_framebuf} || ''));
if ($len && $len > 0) {
$self->{_framebuf_changed} = 1;
}
else {
# EOF detected - connection is gone. We have to reset the framebuf in
# case we had a partial frame in there that will never arrive.
$self->{_framebuf} = "";
delete $self->{_command};
delete $self->{_headers};
}
return $len;
}
sub _read_headers {
my ($self) = @_;
if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
$self->{_framebuf_changed} = 1;
my $raw_headers = $1;
if ($raw_headers =~ s/^(.+)\n//) {
$self->{_command} = $1;
}
foreach my $line (split(/\n/, $raw_headers)) {
my ($key, $value) = split(/\s*:\s*/, $line, 2);
$self->{_headers}->{$key} = $value;
}
return 1;
}
return 0;
}
sub _read_body {
my ($self) = @_;
my $h = $self->{_headers};
if ($h->{'content-length'}) {
if (length($self->{_framebuf}) >= $h->{'content-length'}) {
$self->{_framebuf_changed} = 1;
my $body = substr($self->{_framebuf},
0,
$h->{'content-length'},
'' );
# Trim the trailer off the frame.
$self->{_framebuf} =~ s/^.*?\000\n*//s;
return Net::Stomp::Frame->new({
command => delete $self->{_command},
headers => delete $self->{_headers},
body => $body
});
}
} elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
# No content-length header.
my $body = $1;
$self->{_framebuf_changed} = 1;
return Net::Stomp::Frame->new({
command => delete $self->{_command},
headers => delete $self->{_headers},
body => $body });
}
return 0;
}
# this method is to stop the pointless warnings being thrown when trying to
# call peername() on a closed socket, i.e.
# getpeername() on closed socket GEN125 at
# /opt/xt/xt-perl/lib/5.12.3/x86_64-linux/IO/Socket.pm line 258.
#
# solution taken from:
# http://objectmix.com/perl/80545-warning-getpeername.html
sub _connected {
my $self = shift;
return if $self->{_pid} != $$ and $self->reconnect_on_fork;
my $connected;
{
local $^W = 0;
$connected = $self->socket->connected;
}
return $connected;
}
sub receive_frame {
my ($self, $conf) = @_;
my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
unless (defined $self->_connected) {
$self->_reconnect;
}
my $done = 0;
while ( not $done = $self->_read_headers ) {
return undef unless $self->_read_data($timeout);
}
while ( not $done = $self->_read_body ) {
return undef unless $self->_read_data($timeout);
}
return $done;
}
sub _get_next_transaction {
my $self = shift;
my $serial = $self->serial || 0;
$serial++;
$self->serial($serial);
return ($self->session_id||'nosession') . '-' . $serial;
}
1;
__END__
=head1 NAME
Net::Stomp - A Streaming Text Orientated Messaging Protocol Client
=head1 SYNOPSIS
# send a message to the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
$stomp->send(
{ destination => '/queue/foo', body => 'test message' } );
$stomp->disconnect;
# subscribe to messages from the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
$stomp->subscribe(
{ destination => '/queue/foo',
'ack' => 'client',
'activemq.prefetchSize' => 1
}
);
while (1) {
my $frame = $stomp->receive_frame;
warn $frame->body; # do something here
$stomp->ack( { frame => $frame } );
}
$stomp->disconnect;
# write your own frame
my $frame = Net::Stomp::Frame->new(
{ command => $command, headers => $conf, body => $body } );
$self->send_frame($frame);
# connect with failover supporting similar URI to ActiveMQ
$stomp = Net::Stomp->new({ failover => "failover://tcp://primary:61616" })
# "?randomize=..." and other parameters are ignored currently
$stomp = Net::Stomp->new({ failover => "failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false" })
# Or in a more natural perl way
$stomp = Net::Stomp->new({ hosts => [
{ hostname => 'primary', port => 61616 },
{ hostname => 'secondary', port => 61616 },
] });
=head1 DESCRIPTION
This module allows you to write a Stomp client. Stomp is the Streaming
Text Orientated Messaging Protocol (or the Protocol Briefly Known as
TTMP and Represented by the symbol :ttmp). It's a simple and easy to
implement protocol for working with Message Orientated Middleware from
any language. L<Net::Stomp> is useful for talking to Apache ActiveMQ,
an open source (Apache 2.0 licensed) Java Message Service 1.1 (JMS)
message broker packed with many enterprise features.
A Stomp frame consists of a command, a series of headers and a body -
see L<Net::Stomp::Frame> for more details.
For details on the protocol see L<http://stomp.codehaus.org/Protocol>.
To enable the ActiveMQ Broker for Stomp add the following to the
activemq.xml configuration inside the <transportConnectors> section:
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
To enable the ActiveMQ Broker for Stomp and SSL add the following
inside the <transportConnectors> section:
<transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
For details on Stomp in ActiveMQ See L<http://activemq.apache.org/stomp.html>.
=head1 METHODS
=head2 new
The constructor creates a new object. You must pass in a hostname and
a port or set a failover configuration:
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
If you want to use SSL, make sure you have L<IO::Socket::SSL> and
pass in the SSL flag:
my $stomp = Net::Stomp->new( {
hostname => 'localhost',
port => '61612',
ssl => 1,
} );
If you want to pass in L<IO::Socket::SSL> options:
my $stomp = Net::Stomp->new( {
hostname => 'localhost',
port => '61612',
ssl => 1,
ssl_options => { SSL_cipher_list => 'ALL:!EXPORT' },
} );
=head3 Failover
There is experiemental failover support in Net::Stomp. You can specify failover
in a similar maner to ActiveMQ
(L<http://activemq.apache.org/failover-transport-reference.html>) for
similarity with Java configs or using a more natural method to perl of passing
in an array-of-hashrefs in the C<hosts> parameter.
Currently when ever Net::Stomp connects or reconnects it will simply try the
next host in the list.
=head3 Reconnect on C<fork>
By default Net::Stomp will reconnect, using a different socket, if the
process C<fork>s. This avoids problems when parent & child write to
the socket at the same time. If, for whatever reason, you don't want
this to happen, set C<reconnect_on_fork> to C<0> (either as a
constructor parameter, or by calling the method).
=head2 connect
This connects to the Stomp server. You may pass in a C<login> and
C<passcode> options.
You may also pass in 'client-id', which specifies the JMS Client ID which is
used in combination to the activemqq.subscriptionName to denote a durable
subscriber.
$stomp->connect( { login => 'hello', passcode => 'there' } );
=head2 send
This sends a message to a queue or topic. You must pass in a destination and a
body.
$stomp->send(
{ destination => '/queue/foo', body => 'test message' } );
To send a BytesMessage, you should set the field 'bytes_message' to 1.
=head2 send_transactional
This sends a message in transactional mode and fails if the receipt of the
message is not acknowledged by the server:
$stomp->send_transactional(
{ destination => '/queue/foo', body => 'test message' }
) or die "Couldn't send the message!";
If using ActiveMQ, you might also want to make the message persistent:
$stomp->send_transactional(
{ destination => '/queue/foo', body => 'test message', persistent => 'true' }
) or die "Couldn't send the message!";
=head2 disconnect
This disconnects from the Stomp server:
$stomp->disconnect;
=head2 subscribe
This subscribes you to a queue or topic. You must pass in a destination.
The acknowledge mode defaults to 'auto', which means that frames will
be considered delivered after they have been sent to a client. The
other option is 'client', which means that messages will only be
considered delivered after the client specifically acknowledges them
with an ACK frame.
Other options:
'selector': which specifies a JMS Selector using SQL
92 syntax as specified in the JMS 1.1 specificiation. This allows a
filter to be applied to each message as part of the subscription.
'activemq.dispatchAsync': should messages be dispatched synchronously
or asynchronously from the producer thread for non-durable topics in
the broker. For fast consumers set this to false. For slow consumers
set it to true so that dispatching will not block fast consumers.
'activemq.exclusive': Would I like to be an Exclusive Consumer on a queue.
'activemq.maximumPendingMessageLimit': For Slow Consumer Handlingon
non-durable topics by dropping old messages - we can set a maximum
pending limit which once a slow consumer backs up to this high water
mark we begin to discard old messages.
'activemq.noLocal': Specifies whether or not locally sent messages
should be ignored for subscriptions. Set to true to filter out locally
sent messages.
'activemq.prefetchSize': Specifies the maximum number of pending
messages that will be dispatched to the client. Once this maximum is
reached no more messages are dispatched until the client acknowledges
a message. Set to 1 for very fair distribution of messages across
consumers where processing messages can be slow.
'activemq.priority': Sets the priority of the consumer so that
dispatching can be weighted in priority order.
'activemq.retroactive': For non-durable topics do you wish this
subscription to the retroactive.
'activemq.subscriptionName': For durable topic subscriptions you must
specify the same clientId on the connection and subscriberName on the
subscribe.
$stomp->subscribe(
{ destination => '/queue/foo',
'ack' => 'client',
'activemq.prefetchSize' => 1
}
);
=head2 unsubscribe
This unsubscribes you to a queue or topic. You must pass in a destination:
$stomp->unsubcribe({ destination => '/queue/foo' });
=head2 receive_frame
This blocks and returns you the next Stomp frame.
my $frame = $stomp->receive_frame;
warn $frame->body; # do something here
The header bytes_message is 1 if the message was a BytesMessage.
By default this method will block until a frame can be returned. If you wish to
wait for a specified time pass a C<timeout> argument:
# Wait half a second for a frame, else return undef
$stomp->receive_frame({ timeout => 0.5 })
=head2 can_read
This returns whether there is new data is waiting to be read from the STOMP
server. Optionally takes a timeout in seconds:
my $can_read = $stomp->can_read;
my $can_read = $stomp->can_read({ timeout => '0.1' });
C<undef> says block until something can be read, C<0> says to poll and return
immediately.
=head2 ack
This acknowledges that you have received and processed a frame (if you
are using client acknowledgements):
$stomp->ack( { frame => $frame } );
=head2 send_frame
If this module does not provide enough help for sending frames, you
may construct your own frame and send it:
# write your own frame
my $frame = Net::Stomp::Frame->new(
{ command => $command, headers => $conf, body => $body } );
$self->send_frame($frame);
=head1 SEE ALSO
L<Net::Stomp::Frame>.
=head1 AUTHORS
Leon Brocard <acme@astray.com>,
Thom May <thom.may@betfair.com>,
Michael S. Fischer <michael@dynamine.net>,
Ash Berlin <ash_github@firemirror.com>
=head1 CONTRIBUTORS
Paul Driver <frodwith@cpan.org>,
Andreas Faafeng <aff@cpan.org>,
Vigith Maurice <vigith@yahoo-inc.com>,
Stephen Fralich <sjf4@uw.edu>,
Squeeks <squeek@cpan.org>,
Chisel Wright <chisel@chizography.net>,
=head1 COPYRIGHT
Copyright (C) 2006-9, Leon Brocard
Copyright (C) 2009, Thom May, Betfair.com
Copyright (C) 2010, Ash Berlin, Net-a-Porter.com
Copyright (C) 2010, Michael S. Fischer
This module is free software; you can redistribute it or modify it
under the same terms as Perl itself.