package XAS::Collector::Connector;
our $VERSION = '0.01';
use POE;
use Try::Tiny;
use XAS::Class
version => $VERSION,
base => 'XAS::Lib::Connector',
constants => 'TRUE FALSE ARRAY',
codec => 'JSON',
messages => {
unknownmsg => "%s: unknown protocol type: %s",
noqueues => "no Queues were defined",
notypes => "no Types were defined",
}
;
use Data::Dumper;
# ---------------------------------------------------------------------
# Public Events
# ---------------------------------------------------------------------
sub handle_connected {
my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
my $nframe;
my $alias = $self->config('Alias');
my $queues = $self->config('Queues');
if (ref($queues) eq ARRAY) {
for my $q (@$queues) {
$nframe = $self->stomp->subscribe(
{
destination => $q,
ack => 'client'
}
);
$self->log($kernel, 'info', $self->message('subscribed', $alias, $q));
$kernel->yield('send_data', $nframe);
}
} else {
$nframe = $self->stomp->subscribe(
{
destination => $queues,
ack => 'client'
}
);
$self->log($kernel, 'info', $self->message('subscribed', $alias, $queues));
$kernel->yield('send_data', $nframe);
}
}
sub handle_message {
my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
my $data;
my $message;
my $session;
my $alias = $self->config('Alias');
my $types = $self->config('Types');
my $message_id = $frame->headers->{'message-id'};
my $nframe = $self->stomp->ack({'message-id' => $message_id});
try {
$message = decode($frame->body);
$self->log($kernel, 'info',
$self->message(
'received',
$alias,
$message_id,
$message->{type},
$message->{hostname}
)
);
$data = $message->{data};
$session = $self->_get_session($message->{type}, $types);
if (defined($session)) {
$kernel->call($session, 'store_data', $data, $nframe);
} else {
$self->throw_msg(
'xas.collector.connector.handle_message',
'unknownmsg',
$alias, $message->{type}
);
}
} catch {
my $ex = $_;
$self->exception_handler($ex);
};
}
# ---------------------------------------------------------------------
# Public Methods
# ---------------------------------------------------------------------
sub spawn {
my $class = shift;
my %args = @_;
my $self = $class->SUPER::spawn(@_);
unless (defined($args{'Types'})) {
$self->throw_msg(
'xas.collector.connector.spawn.notypes',
'notypes'
);
}
unless (defined($args{'Queues'})) {
$self->throw_msg(
'xas.collector.connector.spawn.noqueues',
'noqueues'
);
}
return $self;
}
# ---------------------------------------------------------------------
# Private Methods
# ---------------------------------------------------------------------
sub _get_session {
my ($self, $wanted, $types) = @_;
my $key;
my $type;
my $session;
for $type ( @$types ) {
for $key ( keys %$type ) {
$session = $type->{$key} if ($key eq $wanted);
}
}
return $session;
}
1;
__END__
=head1 NAME
XAS::Collector::Connector - Perl extension for the XAS environment
=head1 SYNOPSIS
use XAS::Collector::Connector;
my $types = [
{ 'xas-alert', 'alert' },
];
my $queues = [
'/queue/alert',
];
XAS::Collector::Connector->spawn(
RemoteAddress => $host,
RemotePort => $port,
Alias => 'collector',
Logger => 'logger',
Login => 'collector',
Passcode => 'ddc',
Queues => $queues,
Types => $types
);
=head1 DESCRIPTION
This module is used for monitoring queues on the message server. When messages
are received, they are then passed off to the appropriate message handler.
=head1 METHODS
=head2 spawn
The module uses the configuration items from POE::Component::Client::Stomp
along with this additional items.
=over 4
=item B<Queues>
The queues that the connector will subscribe too. This can be a string or
an array of strings.
=item B<Types>
This is a list of XAS packet types that this connector can handle. The list
consists of hashes with the following values: XAS packet type, name of
the session handler for that packet type.
=back
=head1 PUBLIC EVENTS
=head2 handle_connected($kernel, $self, $frame)
Subscribe to the appropriate queue(s) after authentication.
=over 4
=item B<$kernel>
A handle to the POE kernel
=item B<$self>
A handle to the current object.
=item B<$frame>
The received STOMP frame.
=back
=head2 handle_message($kernel, $self, $frame)
Decode the packet type and pass it off to the appropriate message handler.
=over 4
=item B<$kernel>
A handle to the POE kernel
=item B<$self>
A handle to the current object.
=item B<$frame>
The received STOMP frame.
=back
=head1 SEE ALSO
POE::Component::Client::Stomp
L<XAS|XAS>
=head1 AUTHOR
Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2012 by Kevin L. Esteb
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.
=cut