The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: Thread.pm,v 1.25 2010/03/27 19:51:24 dk Exp $
package IO::Lambda::Thread;
use base qw(IO::Lambda);
use strict;
use warnings;
use Exporter;
use Socket;
use IO::Handle;
use IO::Lambda qw(:all :dev swap_frame);

our $DISABLED;
eval { require threads; };
$DISABLED = $@ if $@;

our $DEBUG = $IO::Lambda::DEBUG{thread};

our @EXPORT_OK = qw(threaded new_thread);
our %EXPORT_TAGS = (all => \@EXPORT_OK);

sub _d { "threaded(" . _o($_[0]) . ")" }

sub thread_init
{
	my ( $r, $cb, $pass_handle, @param) = @_;

	$SIG{KILL} = sub { threads-> exit(0) };
	$SIG{PIPE} = 'IGNORE';
	eval "END { IO::Lambda::__end(); };";
	warn "thread(", threads->tid, ") started\n" if $DEBUG;

	my @ret;
	eval { @ret = $cb-> (( $pass_handle ? $r : ()), @param) if $cb };

	warn "thread(", threads->tid, ") ended: [@ret]\n" if $DEBUG;
	close($r);
	undef $r;
	die $@ if $@;

	return @ret;
}

sub new_thread
{
	return undef, $DISABLED if $DISABLED;

	my ( @args, $cb, $pass_handle, @param);
	@args = shift if $_[0] and ref($_[0]) and ref($_[0]) eq 'HASH';
	( $cb, $pass_handle, @param) = @_;
	
	my $r = IO::Handle-> new;
	my $w = IO::Handle-> new;
	socketpair( $r, $w, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
	$w-> blocking(0);

	my ($t) = threads-> create(
		@args,
		\&thread_init, 
		$r, $cb, $pass_handle, @param
	);

	close($r);

	warn "new thread(", $t->tid, ")\n" if $DEBUG;
	return ($t, $w);
}

# overridden IO::Lambda methods

sub DESTROY
{
	my $self = shift;

	return if defined($self->{tid}) and $self->{tid} != threads-> tid;

	close($self->{socket}) if $self-> {socket};
	delete @{$self}{qw(socket thread)};

	$self-> SUPER::DESTROY;
}

sub thread { $_[0]-> {thread} }
sub socket { $_[0]-> {socket} }

sub threaded(&)
{
	my $cb = shift;

	# use overridden IO::Lambda, because we need 
	# give the caller a chance to join
	# for it, if the lambda gets terminated
	__PACKAGE__-> new( sub { 
		# Save context. This is needed because the caller
		# may have his own this. lambda(&) does the same
		# protection
		my $this  = shift;
		my @frame = swap_frame($this);

		warn _d($this), " started\n" if $DEBUG;

		# can start a thread?
		my ( $t, $r) = new_thread( $cb, 1 );
		return $r unless $t;

		# save this
		$this-> {tid}    = threads-> tid;
		$this-> {thread} = $t;
		$this-> {socket} = $r;

		# now wait
		context $this-> {socket};
		readable {
			my $this = this;
			delete $this-> {thread};
			close($this-> {socket});
			delete @{$this}{qw(socket thread)};
			$this-> clear;
			warn _d($this), " joining\n" if $DEBUG;
			$t-> join;
		};

		# restore context
		swap_frame(@frame);
	});
}

1;

__DATA__

=pod

=head1 NAME

IO::Lambda::Thread - wait for blocking code using threads

=head1 DESCRIPTION

The module implements a lambda wrapper that allows to asynchronously wait for
a blocking code. The wrapping is done so that the code is executed in another
thread's context. C<IO::Lambda::Thread> provides bidirectional communication
between threads, that is based on a shared socket between parent and child
threads. The socket can be also used by the caller for its own needs, if necessary
( see L<IO::Lambda::Message> ).

=head1 SYNOPSIS

    use IO::Lambda qw(:lambda);
    use IO::Lambda::Thread qw(threaded);

    lambda {
        context 0.1, threaded {
	      sleep(1);
	      return "hello!";
	};
        any_tail {
            if ( @_) {
                print "done: ", $_[0]-> peek, "\n";
            } else {
                print "not yet\n";
                again;
            }
        };
    }-> wait;

=head1 API

=over

=item new_thread ( $options = (), $code, $pass_socket, @param) -> ($thread, $socket)

A special replacement for C<< thread-> create >>, that not only creates a
thread, but also creates a socket between the parent and child threads. The
socket is important for getting an asynchronous notification when the child
thread has finished, because there is no portable way to get that signal
otherwise. That means that this socket must be closed and the thread must be
C<join>'ed to avoid problems. For example:
    
    my ( $thread, $reader) = new_thread( $sub {
        my $writer = shift;
        print $writer, "Hello world!\n";
    }, 1 );
    print while <$reader>;
    close($reader);
    $thread-> join;

Note that C<join> is a blocking call, so one needs to be sure that the thread
indeed is finished before joining it. By default, the child thread closes
its side of the socket, thus making the parent side readable. However, the
child code can also hijack the socket for its own needs, so if that
functionality is needed, one must create an extra layer of communication that
ensures that the child code is properly exited, so that the parent can
reliably call C<join> without blocking (see L<IO::Lambda::Message>, that 
is destined exactly for this use).

C<$code> is executed in another thread's context, and is passed the communication
socket ( if C<$pass_socket> is set to 1 ). C<$code> is also passed C<@param>.
Data returned from the code can be retrieved from C<join>.

=item threaded($code) :: () -> ( @results )

Creates a lambda, that executes C<$code> in a newly created thread.
The lambda finishes when the C<$code> and the thread are finished,
and returns results returned by C<$code>.

Note, that this lambda, if C<terminate>'d between after being started and
before being finished, has no chance to wait for completion of the
associated thread, and so Perl will complain. To deal with that, obtain the
thread object manually and wait for the thread:

    my $l = threaded { 42 };
    $l-> start;
    ....
    $l-> terminate;

    # synchronously
    $l-> thread-> join;

    # or asynchronously
    context $l-> socket;
    readable { $l-> thread-> join };

=item thread($lambda)

Returns the associated thread object. Valid only for lambdas created with
C<threaded>.

=item socket($lambda)

Returns the associated communication socket. Valid only for lambdas created
with C<threaded>. 

=back

=head1 BUGS

=over

=item Unbalanced string table refcount

Threading in Perl is fragile, so errors like the following:

   Unbalanced string table refcount: (1) for "GEN1" during global
   destruction

are due to some obscure Perl bugs. They are triggered, in my experience, when a
child thread tries to deallocate scalars that it thinks belongs to that thread.
This can be sometimes avoided with explicit cleaning up of scalars that may be
visible in threads.  For example, calls as

   IO::Lambda::clear

and

   undef $my_lambda; # or other scalars, whatever

inexplicably hush these errors.

=item Perl exited with active threads

Errors like this

  Perl exited with active threads:
        1 running and unjoined
        0 finished and unjoined
        0 running and detached

are triggered when child threads weren't properly joined. Make sure
your lambdas are finished properly. Use C<env IO_LAMBDA_DEBUG=thread>
to find out the details.

=item Scalars leaked: 1

This is a known bug, f.ex. L<http://rt.perl.org/rt3//Public/Bug/Display.html?id=70974>
suggests adding the C<@_ = ();> construct at random places as a workaround.

=item panic: del_backref during global destruction

This is a known bug, L<http://rt.perl.org/rt3/Ticket/Display.html?id=70748> .
I observed in on win32 only. No workaround is known however.

=item AnyEvent

AnyEvent doesn't work with threads, so this module most probably won't
work too when AnyEvent is selected for IO::Lambda::Loop.

=back

=head1 SEE ALSO

L<IO::Lambda>, L<threads>.

=head1 AUTHOR

Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>.

=cut