# $Id: TCP.pm 446 2004-12-27 00:57:57Z sungo $
package POE::Component::Server::Syslog::TCP;

use warnings;
use strict;

our $VERSION = '1.20';

sub BINDADDR        () { '0.0.0.0' }
sub BINDPORT        () { 514 }
sub DATAGRAM_MAXLEN () { 1024 }  # syslogd defaults to this. as do most 
                                 # libc implementations of syslog

use Params::Validate qw(validate_with);
use Carp qw(carp croak);
use Socket;

use POE qw(
	Driver::SysRW
	Wheel::SocketFactory
	Wheel::ReadWrite
	Filter::Syslog
);


sub spawn {
	my $class = shift;

	my %args = validate_with(
		params => \@_,
		spec => {
			InputState   => {
				type     => &Params::Validate::CODEREF,
				optional => 1,
				default  => sub {},
			},
			ErrorState   => {
				type     => &Params::Validate::CODEREF,
				optional => 1,
				default  => sub {},
			},
			BindAddress  => {
				type     => &Params::Validate::SCALAR,
				optional => 1,
				default  => BINDADDR,
			},
			BindPort     => {
				type     => &Params::Validate::SCALAR,
				optional => 1,
				default  => BINDPORT,
			},
			MaxLen       => {
				type     => &Params::Validate::SCALAR,
				optional => 1,
				default  => DATAGRAM_MAXLEN,
			},
			Alias  		 => {
				type     => &Params::Validate::SCALAR,
				optional => 1,
			},
		},
	);

	$args{type} = 'tcp';
	$args{filter} = POE::Filter::Syslog->new();

	my $sess = POE::Session->create(
		inline_states => {
			_start         => \&start,
			_stop          => \&shutdown,

			socket_connect => \&socket_connect,
			socket_error   => \&socket_error,
			socket_input   => \&socket_input,
			register	   => \&register,
			unregister	   => \&unregister,
			shutdown       => \&shutdown,

			client_input => $args{InputState},
			client_error => $args{ErrorState},

		},
		heap => \%args,
	);


	return $sess;
}

sub start {
	$_[HEAP]->{socketfactory} = POE::Wheel::SocketFactory->new(
		BindAddress  => $_[HEAP]->{BindAddress},
		BindPort     => $_[HEAP]->{BindPort},
		SuccessEvent => 'socket_connect',
		FailureEvent => 'client_error',
		ListenQueue  => $_[HEAP]->{MaxLen},
		Reuse        => 'yes',
	);

	unless($_[HEAP]->{socketfactory}) {
		croak("Unable to setup socketfactory");
	}
	$_[KERNEL]->alias_set( $_[HEAP]->{Alias} ) if $_[HEAP]->{Alias};
    return;
}

sub socket_connect {
	my $handle = $_[ARG0];
	my $host;

	if( ( sockaddr_in( getpeername($handle) ) )[1]) {
		$host = gethostbyaddr( ( sockaddr_in( getpeername($handle) ) )[1], AF_INET );
	} 
    else {
		$host = '[unknown]';
	}

	my $wheel = POE::Wheel::ReadWrite->new(
		Handle     => $handle,
		Driver     => POE::Driver::SysRW->new(),
		Filter     => POE::Filter::Syslog->new(),
		InputEvent => 'socket_input',
		ErrorEvent => 'socket_error',
	);

	$_[HEAP]->{wheels}->{ $wheel->ID } = {
		wheel => $wheel,
		host  => $host,
	};
    return;
}

sub socket_error {
	my ($errop, $errnum, $errstr, $wid) = @_[ARG0 .. ARG3];
	unless( ($errnum == 0) && ($errop eq 'read') ) {
		$_[KERNEL]->yield( 'client_error', $errop, $errnum, $errstr );
	}
	delete $_[HEAP]->{wheels}->{ $wid };
    return;
}

sub socket_input {
	my ($input, $wid) = @_[ARG0, ARG1];
	my $info = $_[HEAP]->{wheels}->{ $wid };

	if(ref $input && ref $input eq 'ARRAY') {
		foreach my $record (@{ $input }) {
			$input->{host} = $info->{host};
			$_[KERNEL]->yield( 'client_input', $record );
		}
	} 
    elsif(ref $input && ref $input eq 'HASH') {
		$input->{host} = $info->{host};
		$_[KERNEL]->yield( 'client_input', $input );
		$_[KERNEL]->post( $_, $_[HEAP]->{sessions}->{$_}->{inputevent}, $input )
			for keys %{ $_[HEAP]->{sessions} };
	} 
    else {
		$_[KERNEL]->yield( 'client_error', $input );
		$_[KERNEL]->post( $_, $_[HEAP]->{sessions}->{$_}->{errorevent}, $input )
			for grep { defined $_[HEAP]->{sessions}->{errorevent} } 
			    keys %{ $_[HEAP]->{sessions} };
	}
    return;
}

sub shutdown {
    my ($kernel,$heap) = @_[KERNEL,HEAP];
	if($heap->{socketfactory}) {
		$heap->{socketfactory}->pause_accept();
		delete $heap->{socketfactory};
	}
	delete $heap->{wheels};
	$kernel->alarm_remove_all();
	$kernel->alias_remove( $_ ) for $kernel->alias_list();
	$kernel->refcount_decrement( $_, __PACKAGE__ ) 
		for keys %{ $heap->{sessions} };
    return;
}

sub register {
  my ($kernel,$self,$sender) = @_[KERNEL,HEAP,SENDER];
  my $sender_id = $sender->ID();
  my %args;
  if ( ref $_[ARG0] eq 'HASH' ) {
    %args = %{ $_[ARG0] };
  }
  elsif ( ref $_[ARG0] eq 'ARRAY' ) {
    %args = @{ $_[ARG0] };
  }
  else {
    %args = @_[ARG0..$#_];
  }
  $args{lc $_} = delete $args{$_} for keys %args;
  unless ( $args{inputevent} ) {
    warn "No 'inputevent' argument supplied\n";
    return;
  }
  if ( defined $self->{sessions}->{ $sender_id } ) {
    $self->{sessions}->{ $sender_id } = \%args;
  }
  else {
    $self->{sessions}->{ $sender_id } = \%args;
    $kernel->refcount_increment( $sender_id, __PACKAGE__ );
  }
  return;
}

sub unregister {
  my ($kernel,$self,$sender) = @_[KERNEL,HEAP,SENDER];
  my $sender_id = $sender->ID();
  my %args;
  if ( ref $_[ARG0] eq 'HASH' ) {
    %args = %{ $_[ARG0] };
  }
  elsif ( ref $_[ARG0] eq 'ARRAY' ) {
    %args = @{ $_[ARG0] };
  }
  else {
    %args = @_[ARG0..$#_];
  }
  $args{lc $_} = delete $args{$_} for keys %args;
  my $data = delete $self->{sessions}->{ $sender_id };
  $kernel->refcount_decrement( $sender_id, __PACKAGE__ ) if $data;
  return;
}

1;
__END__

=pod

=head1 NAME

POE::Component::Server::Syslog::TCP

=head1 SYNOPSIS

    POE::Component::Server::Syslog::TCP->spawn(
        BindAddress => '127.0.0.1',
        BindPort    => '514',
        InputState  => \&input,
    );

    sub input {
        my $message = $_[ARG0];
        # .. do stuff ..
    }

=head1 DESCRIPTION

This component provides very simple syslog services for POE.

=head1 CONSTRUCTOR

=head2 spawn()

Spawns a new listener. For a standalone syslog server you may specify 
C<InputState> option to register a subroutine that will be called on
input events.

For integration with other POE Sessions and Components you may use the 
C<register> and C<unregister> states to request that input events be
sent to your sessions.

Returns the POE::Session object it creates.

C<spawn()> accepts the following options:

=over 4

=item * InputState

Requires one argument, C<InputState>, which must
be a reference to a subroutine. This argument will become a POE state
that will be called when input from a syslog client has been recieved.

=item * BindAddress

The address to bind the listener to. Defaults to 0.0.0.0

=item * BindPort

The port number to bind the listener to. Defaults to 514

=item * MaxLen

The maximum length of a datagram. Defaults to 1024, which is the usual
default of most syslog and syslogd implementations.

=item * ErrorState

An optional code reference. This becomes a POE state that will get
called when the component recieves a message it cannot parse. The
erroneous message is passed in as ARG0.

=item * Alias

Optionally specify that the component use the supplied alias.

=back

=head2 InputState

The ClientInput routine obtained by C<spawn()> will be passed a hash
reference as ARG0 containing the following information:

=over 4

=item * time

The time of the datagram (as specified by the datagram itself)

=item * pri

The priority of message.

=item * facility

The "facility" number decoded from the pri.

=item * severity

The "severity" number decoded from the pri.

=item * host

The host that sent the message.

=item * msg

The message itself. This often includes a process name, pid number, and
user name.

=back

=head1 INPUT EVENTS

These are events that this component will accept.

=head2 register

This will register the sending session to receive InputEvent and ErrorEvents from the
component.

Takes a number of parameters:

=over 4

=item * InputEvent

Mandatory parameter, the name of the event in the registering session that will be triggered
for input from clients. ARG0 will contain a hash reference. See C<InputHandler> for details.

=item * ErrorEvent 

Optional parameter, the name of the event in the registering session that will be triggered
for input that cannot be parsed. ARG0 will contain the erroneous message.

=back

The component will increment the refcount of the calling session to make sure it hangs around for events.
Therefore, you should use either C<unregister> or C<shutdown> to terminate registered sessions.

=head2 unregister

This will unregister the sending session from receiving events.

=head2 shutdown

Termintes the component.

=head1 DATE

$Date: 2004-12-26 19:57:57 -0500 (Sun, 26 Dec 2004) $

=head1 REVISION

$Rev: 446 $

Note: This does not necessarily correspond to the distribution version number.


=head1 AUTHOR

Matt Cashner (sungo@cpan.org)

=head1 LICENSE

Copyright (c) 2003-2004, Matt Cashner. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

=over 4

=item * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

=item * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

=item * Neither the name of the Matt Cashner nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

=back

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

=cut

1;
__END__

# sungo // vim: ts=4 sw=4 noexpandtab