The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package POE::Component::Client::Stomp;

use POE;
use Carp;
use Socket ':all'; 
use POE::Filter::Stomp;
use POE::Wheel::ReadWrite;
use POE::Wheel::SocketFactory;
use POE::Component::Client::Stomp::Utils;

use 5.8.2;
use strict;
use warnings;

my $TCP_KEEPCNT = 0;
my $TCP_KEEPIDLE = 0;
my $TCP_KEEPINTVL = 0;

if ($^O eq "aix") {           # from /usr/include/netinet/tcp.h

    $TCP_KEEPIDLE  = 0x11;
    $TCP_KEEPINTVL = 0x12;
    $TCP_KEEPCNT   = 0x13;

} elsif ($^O eq "linux"){     # from /usr/include/netinet/tcp.h

    $TCP_KEEPIDLE  = 4;
    $TCP_KEEPINTVL = 5;
    $TCP_KEEPCNT   = 6;

}

my @errors = qw(0 32 68 73 78 79 110 104 111);
my @reconnections = qw(60 120 240 480 960 1920 3840);

our $VERSION = '0.12';

use Data::Dumper;

# ---------------------------------------------------------------------
# Public Methods
# ---------------------------------------------------------------------

sub spawn {
    my $package = shift;

    croak "$package requires an even number of parameters" if @_ & 1;

    my %args = @_;
    my $self = bless ({}, $package);

    $args{Alias}           = 'stomp-client' unless defined $args{Alias} and $args{Alias};
    $args{RetryReconnect}  = 1 unless defined $args{RetryReconnect};
    $args{EnableKeepAlive} = defined($args{EnableKeepAlive}) ? $args{EnableKeepAlive} : 0;
    $args{RemoteAddress}   = 'localhost' unless defined $args{RemoteAddress};
    $args{RemotePort}      = 61613 unless defined $args{RemotePort};

    $self->{CONFIG} = \%args;
    $self->{count} = scalar(@reconnections);
    $self->{stomp} = POE::Component::Client::Stomp::Utils->new();
    $self->{attempts} = 0;

    POE::Session->create(
        object_states => [
            $self => { 
                _start            => '_session_start',
                _stop             => '_session_stop',
                server_connect    => '_server_connect',
                server_connected  => '_server_connected',
                server_reconnect  => '_server_connect',
                server_error      => '_server_error',
                server_message    => '_server_message',
                server_connection_failed => '_server_connection_failed', 
                session_interrupt => '_session_interrupt',
                session_reload    => '_session_reload',
                shutdown          => '_session_shutdown',
            },
            $self => [ qw( 
                handle_message 
                handle_receipt 
                handle_error
                handle_connected 
                handle_connection
                send_data 
                gather_data 
                connection_down
                connection_up
            ) ],
        ],
    );

    return $self;

}

sub log {
    my ($self, $kernel, $level, @args) = @_;

    warn sprintf("%-5s - %s\n", uc($level), join(' ', @args));

}

sub handle_reload {
    my ($self, $kernel, $session) = @_;

    $kernel->sig_handled();

}

sub handle_shutdown {
    my ($self, $kernel, $session) = @_;

    my $params = {};
    my $frame = $self->stomp->disconnect($params);

    $kernel->call($session, 'send_data', $frame);

}

# ---------------------------------------------------------------------
# Public Accessors
# ---------------------------------------------------------------------

sub stomp {
    my $self = shift;

    return $self->{stomp};

}

sub config {
    my ($self, $arg) = @_;

    return $self->{CONFIG}->{$arg};

}

sub host {
    my $self = shift;

    return $self->{Host};

}

sub port {
    my $self = shift;

    return $self->{Port};

}

# ---------------------------------------------------------------------
# Public Events
# ---------------------------------------------------------------------

sub handle_connection {
    my ($kernel, $self) = @_[KERNEL, OBJECT];

}

sub handle_connected {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

}

sub handle_message {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

}

sub handle_receipt {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

}

sub handle_error {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

}

sub connection_down {
    my ($kernel, $self) = @_[KERNEL, OBJECT];

}

sub connection_up {
    my ($kernel, $self) = @_[KERNEL, OBJECT];

}

sub gather_data {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

}

sub send_data {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

    if (defined($self->{Wheel})) {

        $self->{Wheel}->put($frame);

    }

}

# ---------------------------------------------------------------------
# Private Events
# ---------------------------------------------------------------------

sub _session_start {
    my ($kernel, $self) = @_[KERNEL, OBJECT];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: _session_start()");

    if ((my $rc = $kernel->alias_set($alias)) > 0) {

        croak 'unable to assign an alias to this session';

    }

    # set up signal handling.

    $kernel->sig(HUP  => 'session_interrupt');
    $kernel->sig(INT  => 'session_interrupt');
    $kernel->sig(TERM => 'session_interrupt');
    $kernel->sig(QUIT => 'session_interrupt');

    $kernel->yield('server_connect');

}

sub _session_stop {
    my ($kernel, $self) = @_[KERNEL, OBJECT];

    delete $self->{Listner};
    delete $self->{Wheel};

    $kernel->alias_remove($self->config('Alias'));

}

sub _session_reload {
    my ($kernel, $self, $session) = @_[KERNEL,OBJECT,SESSION];

    $self->handle_reload($kernel, $session);

}

sub _session_interrupt {
    my ($kernel, $self, $session, $signal) = @_[KERNEL,OBJECT,SESSION,ARG0];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: _session_interrupt()");

    if ($signal eq 'HUP') {

        $self->handle_reload($kernel, $session);

    } else {

        $self->handle_shutdown($kernel, $session);

    }

}

sub _session_shutdown {
    my ($kernel, $self, $session) = @_[KERNEL, OBJECT, SESSION];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: _session_shutdown()");

    $self->handle_shutdown($kernel, $session);

}

sub _server_connect {
    my ($kernel, $self) = @_[KERNEL, OBJECT];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: _server_connect()");

    $self->{Listner} = POE::Wheel::SocketFactory->new(
        RemoteAddress  => $self->config('RemoteAddress'),
        RemotePort     => $self->config('RemotePort'),
        SocketType     => SOCK_STREAM,
        SocketDomain   => AF_INET,
        Reuse          => 'no',
        SocketProtocol => 'tcp',
        SuccessEvent   => 'server_connected',
        FailureEvent   => 'server_connection_failed',
    );

}

sub _server_connected {
    my ($kernel, $self, $socket, $peeraddr, $peerport, $wheel_id) = 
       @_[KERNEL, OBJECT, ARG0 .. ARG3];

    my $os = $^O;
    my $alias = $self->config('Alias');
    
    $self->log($kernel, 'debug', "$alias: _server_connected()");

    my $wheel = POE::Wheel::ReadWrite->new(
        Handle => $socket,
        Filter => POE::Filter::Stomp->new(),
        InputEvent => 'server_message',
        ErrorEvent => 'server_error',
    );

    if ($self->config('EnableKeepAlive')) {

        $self->log($kernel, 'debug', "$alias: keepalive activated");

        # turn keepalive on, this should send a keepalive 
        # packet once every 2 hours according to the RFC.

        setsockopt($socket, SOL_SOCKET,  SO_KEEPALIVE,  1);

        if (($os eq 'linux') or ($os eq 'aix')) {

            $self->log($kernel, 'debug', "$alias: adjusting keepalive activity");

            # adjust from system defaults, all values are in seconds.
            # so this does the following:
            #     every 15 minutes send upto 3 packets at 5 second intervals
            #         if no reply, the connection is down.

            setsockopt($socket, IPPROTO_TCP, $TCP_KEEPIDLE,  900);  # 15 minutes
            setsockopt($socket, IPPROTO_TCP, $TCP_KEEPINTVL, 5);    # 
            setsockopt($socket, IPPROTO_TCP, $TCP_KEEPCNT,   3);    # 

        }

    }

    my $host = gethostbyaddr($peeraddr, AF_INET);

    $self->{attempts} = 0;
    $self->{Wheel} = $wheel;
    $self->{Host} = $host;
    $self->{Port} = $peerport;

    $kernel->yield('handle_connection');

}

sub _server_connection_failed {
    my ($kernel, $self, $operation, $errnum, $errstr, $wheel_id) = 
        @_[KERNEL, OBJECT, ARG0 .. ARG3];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: _server_connection_failed()");
    $self->log($kernel, 'error', "$alias: operation: $operation; reason: $errnum - $errstr");

    delete $self->{Listner};
    delete $self->{Wheel};

    foreach my $error (@errors) {

        $self->_reconnect($kernel) if ($errnum == $error);

    }

}

sub _server_error {
    my ($kernel, $self, $operation, $errnum, $errstr, $wheel_id) = 
        @_[KERNEL, OBJECT, ARG0 .. ARG3];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: _server_error()");
    $self->log($kernel, 'error', "$alias: operation: $operation; reason: $errnum - $errstr");

    delete $self->{Listner};
    delete $self->{Wheel};

    $kernel->yield('connection_down');

    foreach my $error (@errors) {

        $self->_reconnect($kernel) if ($errnum == $error);

    }

}

sub _server_message {
    my ($kernel, $self, $frame, $wheel_id) = @_[KERNEL, OBJECT, ARG0, ARG1];

    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug' , "$alias: _server_message()");

    if ($frame->command eq 'CONNECTED') {

        $self->log($kernel, 'debug' , "$alias: received a \"CONNECTED\" message");
        $kernel->yield('handle_connected', $frame);

    } elsif ($frame->command eq 'MESSAGE') {

        $self->log($kernel, 'debug' , "$alias: received a \"MESSAGE\" message");
        $kernel->yield('handle_message', $frame);

    } elsif ($frame->command eq 'RECEIPT') {

        $self->log($kernel, 'debug' , "$alias: received a \"RECEIPT\" message");
        $kernel->yield('handle_receipt', $frame);

    } elsif ($frame->command eq 'ERROR') {

        $self->log($kernel, 'debug' , "$alias: received an \"ERROR\" message");
        $kernel->yield('handle_error', $frame);

    } else {

        $self->log($kernel, 'warn', "$alias: unknown message type: $frame->command");

    }

}

# ---------------------------------------------------------------------
# Private Methods
# ---------------------------------------------------------------------

sub _reconnect {
    my ($self, $kernel) = @_;

    my $retry;
    my $alias = $self->config('Alias');

    $self->log($kernel, 'debug', "$alias: attempts: $self->{attempts}, count: $self->{count}");

    if ($self->{attempts} < $self->{count}) {

        my $delay = $reconnections[$self->{attempts}];
        $self->log($kernel, 'warn', "$alias: attempting reconnection: $self->{attempts}, waiting: $delay seconds");
        $self->{attempts}++;
        $kernel->delay('server_reconnect', $delay);

    } else {

        $retry = $self->config('RetryReconnect') || 0;

        if ($retry) {

            $self->log($kernel, 'warn', "$alias: cycling reconnection attempts, but not shutting down...");
            $self->{attempts} = 0;
            $kernel->yield('server_reconnect');

        } else {

            $self->log($kernel, 'warn', "$alias: shutting down, to many reconnection attempts");
            $kernel->yield('shutdown'); 

        }

    }

}

1;

__END__

=head1 NAME

POE::Component::Client::Stomp - A Perl extension for the POE Environment

=head1 SYNOPSIS

This module is a class used to create clients that need to access a 
message server that communicates with the STOMP protocol. Your program could 
look as follows:

 package Client;

 use POE;
 use base qw(POE::Component::Client::Stomp);

 use strict;
 use warnings;

 sub handle_connection {
    my ($kernel, $self) = @_[KERNEL, OBJECT];
 
    my $nframe = $self->stomp->connect({login => 'testing', 
                                        passcode => 'testing'});
    $kernel->yield('send_data' => $nframe);

 }

 sub handle_connected {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

    my $nframe = $self->stomp->subscribe({destination => $self->config('Queue'), 
                                          ack => 'client'});
    $kernel->yield('send_data' => $nframe);

 }
 
 sub handle_message {
    my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];

    my $message_id = $frame->headers->{'message-id'};
    my $nframe = $self->stomp->ack({'message-id' => $message_id});
    $kernel->yield('send_data' => $nframe);

 }

 package main;

 use POE;
 use strict;

    Client->spawn(
        Alias => 'testing',
        Queue => '/queue/testing',
    );

    $poe_kernel->run();

    exit 0;


=head1 DESCRIPTION

This module handles the nitty-gritty details of setting up the communications 
channel to a message queue server. You will need to sub-class this module
with your own for it to be usefull.

An attempt to maintain that channel will be made when/if that server should 
happen to disappear off the network. There is nothing more unpleasent then 
having to go around to dozens of servers and restarting processes.

When messages are received, specific events are generated. Those events are 
based on the message type. If you are interested in those events you should 
override the default behaviour for those events. The default behaviour is to 
do nothing.

=head1 METHODS

=over 4

=item spawn

This method initializes the class and starts a session to handle the 
communications channel. The only parameters that having meaning are:

=over 4

 Alias           - The session alias, defaults to 'stomp-client'
 RemoteAddress   - The servers hostname, defaults to 'localhost'
 RemotePort      - The servers port, defaults to '61613'
 RetryReconnect  - Wither to attempt reconnections after they run out
 EnableKeepAlive - For those pesky firewalls, defaults to false

=back

All other parameters are stored within an internal config.

=item send_data

You use this event to send Stomp frames to the server. 

=over 4

=item Example

 $kernel->yield('send_data', $frame);

=back

=item handle_connection

This event is signaled and the corresponding method is called upon initial 
connection to the message server. For the most part you should send a 
"connect" frame to the server.

=over 4

=item Example

 sub handle_connection {
     my ($kernel, $self) = @_[KERNEL,$OBJECT];
 
    my $nframe = $self->stomp->connect({login => 'testing', 
                                        passcode => 'testing'});
    $kernel->yield('send_data' => $nframe);
     
 }

=back

=item handled_connected

This event and corresponing method is called when a "CONNECT" frame is 
received from the server. This means the server will allow you to start
generating/processing frames.

=over 4

=item Example

 sub handle_connected {
     my ($kernel, $self, $frame) = @_[KERNEL,$OBJECT,ARG0];
 
     my $nframe = $self->stomp->subscribe({destination => $self->config('Queue'), 
                                           ack => 'client'});
     $kernel->yield('send_data' => $nframe);
     
 }

This example shows you how to subscribe to a particular queue. The queue name
was passed as a parameter to spawn() so it is available in the $self->{CONFIG}
hash.

=back

=item handle_message

This event and corresponding method is used to process "MESSAGE" frames. 

=over 4

=item Example

 sub handle_message {
     my ($kernel, $self, $frame) = @_[KERNEL,$OBJECT,ARG0];
 
     my $message_id = $frame->headers->{'message-id'};
     my $nframe = $self->stomp->ack({'message-id' => $message_id});
     $kernel->yield('send_data' => $nframe);
     
 }

This example really doesn't do much other then "ack" the messages that are
received. 

=back

=item handle_receipt

This event and corresponding method is used to process "RECEIPT" frames. 

=over 4

=item Example

 sub handle_receipt {
     my ($kernel, $self, $frame) = @_[KERNEL,$OBJECT,ARG0];
 
     my $receipt = $frame->headers->{receipt};
     
 }

This example really doesn't do much, and you really don't need to worry about
receipts unless you ask for one when you send a frame to the server. So this 
method could be safely left with the default.

=back

=item handle_error

This event and corresponding method is used to process "ERROR" frames. 

=over 4

=item Example

 sub handle_error {
     my ($kernel, $self, $frame) = @_[KERNEL,$OBJECT,ARG0];
 
 }

This example really doesn't do much. Error handling is pretty much what the
process needs to do when something unexpected happens.

=back

=item gather_data

This event and corresponding method is used to "gather data". How that is done
is up to your program. But usually a "send_data" event is generated.

=over 4

=item Example

 sub gather_data {
     my ($kernel, $self) = @_[KERNEL,$OBJECT];
 
     # doing something here

     $kernel->yield('send_data' => $frame);

 }

=back

=item connection_down

This event and corresponding method is a hook to allow you to be notified if 
the connection to the server is currently down. By default it does nothing. 
But it would be usefull to notify "gather_data" to temporaily stop doing 
whatever it is currently doing.

=over 4

=item Example

 sub connection_down {
    my ($kernel, $self) = @_[KERNEL,OBJECT];

    # do something here

 }

=back

=item connection_up

This event and corresponding method is a hook to allow you to be notified 
when the connection to the server up. By default it does nothing. 
But it would be usefull to notify "gather_data" to start doing 
whatever it supposed to do.

=over 4

=item Example

 sub connection_up {
    my ($kernel, $self) = @_[KERNEL,OBJECT];

    # do something here

 }

=back

=item log

This method is used internally to send a log message to stdout. It can be 
overridden to hook into your perferred logging module. This module currently
uses the following levels internally: 'warn', 'error', 'debug'

=over 4

=item Example

 sub log {
     my ($self, $kernel, $level, @args) = @_;

     if ($level eq 'error') {

         $kernel->post('logger' => error => @args);

     } elsif ($level eq 'warn') {

         $kernel->post('logger' => warn => @args);

    }

 }

=back

=item handle_shutdown

This method is a hook and should be overidden to do "shutdown" stuff. By
default it sends a "DISCONNECT" message to the message queue server.

=over 4

=item Example

 sub handle_shutdown {
    my ($self, $kernel, $session) = @_;

    # do something here

 }

=back

=item handle_reload

This method is a hook and should be overidden to do "reload" stuff. By
default it executes POE's sig_handled() method.

=over 4

=item Example

 sub handle_reload {
    my ($self, $kernel, $session) = @_;

    $kernel->sig_handled();

 }

=back

=back

=head1 ACCESSORS

=over 4

=item stomp

This returns an object to the interal POE::Component::Client::Stomp::Utils 
object. This is very useful for creating Stomp frames.

=over 4

=item Example

 $frame = $self->stomp->connect({login => 'testing', 
                                 passcode => 'testing'});
 $kernel->yield('send_data' => $frame);

=back

=item config

This accessor is used to return items from the internal config. The config is
loaded from the parameters that were used when spawn() was called.

=over 4

=item Example

 $logger = $self->config('Logger');

=back

=back

=head1 SEE ALSO

 Net::Stomp::Frame
 POE::Filter::Stomp
 POE::Component::MessageQueue
 POE::Compoment::Client::Stomp::Utils;

 For information on the Stomp protocol: http://stomp.codehaus.org/Protocol

=head1 AUTHOR

Kevin L. Esteb, E<lt>kesteb@wsipc.orgE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2007 by Kevin L. Esteb

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.

=cut