The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
## ----------------------------------------------------------------------------
#  IO::Tail
# -----------------------------------------------------------------------------
# Mastering programmed by YAMASHINA Hio
#
# Copyright 2007 YAMASHINA Hio
# -----------------------------------------------------------------------------
# $Id$
# -----------------------------------------------------------------------------
package IO::Tail;
use strict;
use warnings;

use IO::Poll qw(POLLIN POLLERR POLLHUP POLLNVAL);
our $SEEK_SET = 0;
our $SEEK_END = 2;
our $POLL_FLAGS = POLLIN | POLLERR | POLLHUP | POLLNVAL;

our $VERSION = '0.01';

1;


=begin COMMENT

format for an item of IO::Tail.

$type is one of "handle", "file", "timeout", "interval".

common case:

	my $item = {
		type     => $type,
		name     => $name, # e.g. "$type:$obj".
		callback => $callback,
		buffer   => '',
		_read    => \&_read_handle,
	};

when type is "handle":

	my $item = {
		type     => 'handle',
		name     => "handle:$handle",
		handle   => $handle,
		callback => $callback,
		buffer   => '',
		_read => \&_read_handle,
	};

when type is "file":

	my $item = {
		type     => 'file',
		name     => $file,
		handle   => $fh,
		pos      => $pos,
		callback => $callback,
		buffer   => '',
		_read => \&_read_file,
	};

when type is "timeout":

	my $item = {
		type     => 'timeout',
		name     => "timeout:$callback",
		interval => $timeout_secs,
		timeout  => $next_timeout,
		callback => $callback,
	};

when type is "interval":

	my $item = {
		type   => 'interval',
		name   => "interval:$callback",
		interval => $interval_secs,
		timeout  => $next_timeout,
		callback => $callback,
	};

=end COMMENT

=cut

# -----------------------------------------------------------------------------
# $pkg->new();
#
sub new
{
	my $pkg = shift;
	my $this = bless {}, $pkg;
	$this->{poll}    = undef;
	$this->{handles} = {};
	$this->{files}   = [];
	$this->{timeout} = {};
	
	$this;
}

# -----------------------------------------------------------------------------
# $tail->add($obj, $callback, $opts);
#
sub add
{
	my $this = shift;
	$this->_do('add', @_);
}

# -----------------------------------------------------------------------------
# $tail->remove($obj);
#
sub remove
{
	my $this = shift;
	$this->_do('remove', @_);
}

# -----------------------------------------------------------------------------
# $tail->_do($cmd, $obj, $callback, $opts);
#
sub _do
{
	my $this = shift;
	my $cmd  = shift;
	my $obj  = shift;
	
	$cmd =~ /^(?:add|remove)\z/ or die "_do: $cmd";
	my $type;
	if( UNIVERSAL::isa($obj, 'GLOB') )
	{
		$type = 'handle';
	}elsif( UNIVERSAL::isa($obj, 'HASH') )
	{
		$type = $obj->{type};
		unshift @_, $obj;
		$obj = $type =~ /^(file|handle)$/ ? $obj->{$type} : $obj->{callback};
	}else
	{
		$type = 'file';
	}
	my $subname = "${cmd}_${type}";
	my $sub = $this->can($subname) or die "_do: $subname";
	$this->$sub($obj, @_);
}

# -----------------------------------------------------------------------------
# $tail->add_handle($handle, $callback, $opts);
#
sub add_handle
{
	my $this     = shift;
	my $handle   = shift;
	my $callback = shift;
	my $opts     = shift;
	
	my $poll = $this->{poll} ||= IO::Poll->new();
	$handle->blocking(0);
	$poll->mask($handle, POLLIN);
	
	my $item = {
		type     => 'handle',
		name     => "handle:$handle",
		handle   => $handle,
		callback => $callback,
		buffer   => '',
		_read => \&_read_handle,
	};
	$this->{handles}{$handle} = $item;
	
	$this;
}

# -----------------------------------------------------------------------------
# $tail->remove_handle($handle);
#
sub remove_handle
{
	my $this   = shift;
	my $handle = shift;
	if( my $item = delete $this->{handles}{$handle} )
	{
		my $poll = $this->{poll};
		$poll->remove($handle);
		delete $this->{handles}{$handle};
		
		if( keys %{$this->{handles}}==0 )
		{
			$this->{poll} = undef;
		}
	}
	$this;
}

# -----------------------------------------------------------------------------
# $tail->_read_handle($item);
#
sub _read_handle
{
	my $this = shift;
	my $item = shift;
	my $handle = $item->{handle};
	READ:{
		my $len = sysread($handle, $item->{buffer}, 1024, length $item->{buffer});
		if( $len )
		{
			my $ret = $this->_callback_read($item);
			$ret or return; # quit.
			redo READ;
		}
		if( defined($len) )
		{
			# eof.
			return;
		}
		$!{EAGAIN} and last READ;
		die "sysread: $item->{name}: $!";
	}
	1;
}

# -----------------------------------------------------------------------------
# $tail->add_file($file, $callback, $opts);
#
sub add_file
{
	my $this = shift;
	my $file = shift;
	my $callback = shift;
	my $opts     = shift;
	
	if( $file eq '-' )
	{
		return $this->add_handle(\*STDIN, $callback, @_);
	}
	
	open(my $fh, '<', $file) or die "$file: $!";
	my $pos = sysseek($fh, 0, $SEEK_END) || 0;
	my $item = {
		type     => 'file',
		name     => $file,
		handle   => $fh,
		pos      => $pos,
		callback => $callback,
		buffer   => '',
		_read => \&_read_file,
	};
	push(@{$this->{files}}, $item);
	
	$this;
}

# -----------------------------------------------------------------------------
# $tail->remove_file($file);
#
sub remove_file
{
	my $this   = shift;
	my $file = shift;
	
	if( $file eq '-' )
	{
		return $this->remove_handle(\*STDIN, @_);
	}
	
	my $files = $this->{files};
	foreach my $item (@$files)
	{
		$item->{name} eq $file or next;
		$item = undef;
	}
	@$files = grep {$_} @$files;
	$this;
}

# -----------------------------------------------------------------------------
# $tail->_read_file($item);
#
sub _read_file
{
	my $this = shift;
	my $item = shift;
	
	my $pos = sysseek($item->{handle}, 0, $SEEK_END);
	defined($pos) or die "sysseek: $item->{name}: $!";
	if( $pos==$item->{pos} )
	{
		return 1;
	}
	sysseek($item->{handle}, $item->{pos}, $SEEK_SET);
	
	my $len = sysread($item->{handle}, $item->{buffer}, $pos-$item->{pos}, length $item->{buffer});
	$item->{pos} = $pos;
	if( $len )
	{
		my $ret = $this->_callback_read($item);
		$ret or return; # quit.
	}elsif( defined($len) )
	{
		# eof.
		return;
	}else
	{
		$!{EAGAIN} and last READ;
		die "sysread: $item->{name}: $!";
	}
	1;
}

# -----------------------------------------------------------------------------
# $tail->add_timeout($callback, $timeout_secs, $opts);
#
sub add_timeout
{
	my $this = shift;
	my $callback = shift;
	my $timeout_secs = shift;
	my $opts = shift;
	
	my $next_timeout = time + $timeout_secs;
	my $item = {
		type     => 'timeout',
		name     => "timeout:$callback",
		interval => $timeout_secs,
		timeout  => $next_timeout,
		callback => $callback,
	};
	$this->{timeout}{$item} = $item;
	$this;
}

# -----------------------------------------------------------------------------
# $tail->remove_timeout($callback);
#
sub remove_timeout
{
	my $this     = shift;
	my $callback = shift;
	my $timeout = $this->{timeout};
	foreach my $item (values %$timeout)
	{
		$item->{callback} eq $callback or next;
		delete $timeout->{$item};
	}
	$this;
}

# -----------------------------------------------------------------------------
# $tail->add_interval($callback, $interval_secs, $opts);
#
sub add_interval
{
	my $this = shift;
	my $callback = shift;
	my $interval_secs = shift;
	my $opts = shift;
	
	my $next_timeout = time + $interval_secs;
	my $item = {
		type   => 'interval',
		name   => "interval:$callback",
		interval => $interval_secs,
		timeout  => $next_timeout,
		callback => $callback,
	};
	$this->{timeout}{$item} = $item;
	$this;
}

# -----------------------------------------------------------------------------
# $tail->remove_interval($callback);
#
sub remove_interval
{
	my $this     = shift;
	my $callback = shift;
	my $timeout = $this->{timeout};
	foreach my $item (values %$timeout)
	{
		$item->{callback} eq $callback or next;
		delete $timeout->{$item};
	}
	$this;
}

# -----------------------------------------------------------------------------
# $tail->_callback_read($item) @ private.
#
sub _callback_read
{
	my $this = shift;
	my $item = shift;
	scalar $item->{callback}->(\$item->{buffer}, undef, $item->{args}, $item);
}

# -----------------------------------------------------------------------------
# $tail->_callback_eof($item) @ private.
#
sub _callback_eof
{
	my $this = shift;
	my $item = shift;
	$item && $item->{callback} or return;
	scalar $item->{callback}->(\$item->{buffer}, 'eof', $item->{args}, $item);
}

# -----------------------------------------------------------------------------
# $tail->check();
#
sub check
{
	my $this = shift;
	
	# check_handles.
	if( my $poll = $this->{poll} )
	{
		my $ev = $poll->poll(0);
		$ev==-1 and die "poll: $!";
		foreach my $handle ($poll->handles($POLL_FLAGS))
		{
			my $item = $this->{handles}{$handle};
			my $ret = $item->{_read}->($this, $item);
			if( !$ret )
			{
				$this->_callback_eof($item);
				$poll->remove($handle);
				delete $this->{handles}{$handle};
			}
		}
		if( keys %{$this->{handles}}==0 )
		{
			$this->{poll} = undef;
		}
	}
	
	# check files.
	if( my $files = $this->{files} )
	{
		foreach my $item (@$files)
		{
			my $ret = $item->{_read}->($this, $item);
			if( !$ret )
			{
				$this->_callback_eof($item);
				$item = undef;
			}
		}
		@$files = grep {$_} @$files;
	}
	
	# check timeouts and intervals.
	if( my $timeout = $this->{timeout} )
	{
		my $now = time;
		foreach my $item (values %$timeout)
		{
			if( $now > $item->{timeout} )
			{
				$item->{callback}->(undef, undef, $item->{args}, $item);
				if( $item->{type} eq 'interval' )
				{
					$item->{timeout} = $now + $item->{interval};
				}else
				{
					delete $timeout->{$item};
				}
			}
		}
	}
	
	($this->{poll} || @{$this->{files}} || keys %{$this->{timeout}}) && 1;
}

# -----------------------------------------------------------------------------
# $tail->loop();
#
sub loop
{
	my $this = shift;
	my $timeout_secs = shift;
	my $enter_at = time;
	
	while( $this->check() )
	{
		if( defined($timeout_secs) && time - $enter_at > $timeout_secs )
		{
			last;
		}
		select(undef,undef,undef,0.1);
	}
}


# -----------------------------------------------------------------------------
# End of Module.
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# End of File.
# -----------------------------------------------------------------------------
__END__

=encoding utf8

=for stopwords
	YAMASHINA
	Hio
	ACKNOWLEDGEMENTS
	AnnoCPAN
	CPAN
	RT

=head1 NAME

IO::Tail - follow the tail of files/stream

=head1 VERSION

Version 0.01

=head1 SYNOPSIS

 use IO::Tail;
 my $tail = IO::Tail->new();
 $tail->add(\*STDIN, \&callback);
 $tail->add('test.log', \&callback);
 $tail->check();
 $tail->loop();

=head1 EXPORT

No functions are exported.

=head1 METHODS

=head2 $pkg->new()

=head2 $pkg->add($obj, $callback);

=head2 $pkg->remove($obj);

=head2 $pkg->check();

=head2 $pkg->loop();

=head2 $pkg->add_handle($handle, $callback);

=head2 $pkg->remove_handle($handle);

=head2 $pkg->add_file($file, $callback);

=head2 $pkg->remove_file($file);

=head2 $pkg->add_timeout($callback, $timeout_secs);

=head2 $pkg->remove_timeout($callback);

=head2 $pkg->add_interval($callback, $interval_secs);

=head2 $pkg->remove_interval($callback);

=head1 AUTHOR

YAMASHINA Hio, C<< <hio at cpan.org> >>

=head1 BUGS

Please report any bugs or feature requests to
C<bug-io-tail at rt.cpan.org>, or through the web interface at
L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=IO-Tail>.
I will be notified, and then you'll automatically be notified of progress on
your bug as I make changes.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc IO::Tail

You can also look for information at:

=over 4

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/IO-Tail>

=item * CPAN Ratings

L<http://cpanratings.perl.org/d/IO-Tail>

=item * RT: CPAN's request tracker

L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=IO-Tail>

=item * Search CPAN

L<http://search.cpan.org/dist/IO-Tail>

=back

=head1 ACKNOWLEDGEMENTS

=head1 COPYRIGHT & LICENSE

Copyright 2007 YAMASHINA Hio, all rights reserved.

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut

1; # End of IO::Tail